diff --git a/app/router/__init__.py b/app/router/__init__.py index e0efa4a..4316997 100644 --- a/app/router/__init__.py +++ b/app/router/__init__.py @@ -1,7 +1,6 @@ from __future__ import annotations -from app.signalr import signalr_router as signalr_router - +# from app.signalr import signalr_router as signalr_router from .auth import router as auth_router from .fetcher import fetcher_router as fetcher_router from .file import file_router as file_router @@ -26,5 +25,5 @@ __all__ = [ "private_router", "redirect_api_router", "redirect_router", - "signalr_router", + # "signalr_router", ] diff --git a/app/router/private/__init__.py b/app/router/private/__init__.py index 3858f0e..dc291c6 100644 --- a/app/router/private/__init__.py +++ b/app/router/private/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -from . import avatar, cover, oauth, relationship, stats, team, username # noqa: F401 +from . import avatar, cover, oauth, relationship, team, username # noqa: F401 from .router import router as private_router __all__ = [ diff --git a/app/router/private/stats.py b/app/router/private/stats.py deleted file mode 100644 index 327d4e0..0000000 --- a/app/router/private/stats.py +++ /dev/null @@ -1,356 +0,0 @@ -from __future__ import annotations - -import asyncio -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime -import json - -from app.dependencies.database import get_redis, get_redis_message -from app.log import logger -from app.utils import bg_tasks, utcnow - -from .router import router - -from pydantic import BaseModel - -# Redis key constants -REDIS_ONLINE_USERS_KEY = "server:online_users" -REDIS_PLAYING_USERS_KEY = "server:playing_users" -REDIS_REGISTERED_USERS_KEY = "server:registered_users" -REDIS_ONLINE_HISTORY_KEY = "server:online_history" - -# 线程池用于同步Redis操作 -_executor = ThreadPoolExecutor(max_workers=2) - - -async def _redis_exec(func, *args, **kwargs): - """在线程池中执行同步Redis操作""" - loop = asyncio.get_event_loop() - return await loop.run_in_executor(_executor, func, *args, **kwargs) - - -class ServerStats(BaseModel): - """服务器统计信息响应模型""" - - registered_users: int - online_users: int - playing_users: int - timestamp: datetime - - -class OnlineHistoryPoint(BaseModel): - """在线历史数据点""" - - timestamp: datetime - online_count: int - playing_count: int - - -class OnlineHistoryResponse(BaseModel): - """24小时在线历史响应模型""" - - history: list[OnlineHistoryPoint] - current_stats: ServerStats - - -@router.get("/stats", response_model=ServerStats, tags=["统计"]) -async def get_server_stats() -> ServerStats: - """ - 获取服务器实时统计信息 - - 返回服务器注册用户数、在线用户数、正在游玩用户数等实时统计信息 - """ - redis = get_redis() - - try: - # 并行获取所有统计数据 - registered_count, online_count, playing_count = await asyncio.gather( - _get_registered_users_count(redis), - _get_online_users_count(redis), - _get_playing_users_count(redis), - ) - - return ServerStats( - registered_users=registered_count, - online_users=online_count, - playing_users=playing_count, - timestamp=utcnow(), - ) - except Exception as e: - logger.error(f"Error getting server stats: {e}") - # 返回默认值 - return ServerStats( - registered_users=0, - online_users=0, - playing_users=0, - timestamp=utcnow(), - ) - - -@router.get("/stats/history", response_model=OnlineHistoryResponse, tags=["统计"]) -async def get_online_history() -> OnlineHistoryResponse: - """ - 获取最近24小时在线统计历史 - - 返回过去24小时内每小时的在线用户数和游玩用户数统计, - 包含当前实时数据作为最新数据点 - """ - try: - # 获取历史数据 - 使用同步Redis客户端 - redis_sync = get_redis_message() - history_data = await _redis_exec(redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1) - history_points = [] - - # 处理历史数据 - for data in history_data: - try: - point_data = json.loads(data) - # 只保留基本字段 - history_points.append( - OnlineHistoryPoint( - timestamp=datetime.fromisoformat(point_data["timestamp"]), - online_count=point_data["online_count"], - playing_count=point_data["playing_count"], - ) - ) - except (json.JSONDecodeError, KeyError, ValueError) as e: - logger.warning(f"Invalid history data point: {data}, error: {e}") - continue - - # 获取当前实时统计信息 - current_stats = await get_server_stats() - - # 如果历史数据为空或者最新数据超过15分钟,添加当前数据点 - if not history_points or ( - history_points - and (current_stats.timestamp - max(history_points, key=lambda x: x.timestamp).timestamp).total_seconds() - > 15 * 60 - ): - history_points.append( - OnlineHistoryPoint( - timestamp=current_stats.timestamp, - online_count=current_stats.online_users, - playing_count=current_stats.playing_users, - ) - ) - - # 按时间排序(最新的在前) - history_points.sort(key=lambda x: x.timestamp, reverse=True) - - # 限制到最多48个数据点(24小时) - history_points = history_points[:48] - - return OnlineHistoryResponse(history=history_points, current_stats=current_stats) - except Exception as e: - logger.error(f"Error getting online history: {e}") - # 返回空历史和当前状态 - current_stats = await get_server_stats() - return OnlineHistoryResponse(history=[], current_stats=current_stats) - - -@router.get("/stats/debug", tags=["统计"]) -async def get_stats_debug_info(): - """ - 获取统计系统调试信息 - - 用于调试时间对齐和区间统计问题 - """ - try: - from app.service.enhanced_interval_stats import EnhancedIntervalStatsManager - - current_time = utcnow() - current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() - interval_stats = await EnhancedIntervalStatsManager.get_current_interval_stats() - - # 获取Redis中的实际数据 - redis_sync = get_redis_message() - - online_key = f"server:interval_online_users:{current_interval.interval_key}" - playing_key = f"server:interval_playing_users:{current_interval.interval_key}" - - online_users_raw = await _redis_exec(redis_sync.smembers, online_key) - playing_users_raw = await _redis_exec(redis_sync.smembers, playing_key) - - online_users = [int(uid.decode() if isinstance(uid, bytes) else uid) for uid in online_users_raw] - playing_users = [int(uid.decode() if isinstance(uid, bytes) else uid) for uid in playing_users_raw] - - return { - "current_time": current_time.isoformat(), - "current_interval": { - "start_time": current_interval.start_time.isoformat(), - "end_time": current_interval.end_time.isoformat(), - "key": current_interval.interval_key, - "is_current": current_interval.is_current(), - "minutes_remaining": int((current_interval.end_time - current_time).total_seconds() / 60), - "seconds_remaining": int((current_interval.end_time - current_time).total_seconds()), - "progress_percentage": round( - (1 - (current_interval.end_time - current_time).total_seconds() / (30 * 60)) * 100, - 1, - ), - }, - "interval_statistics": interval_stats.to_dict() if interval_stats else None, - "redis_data": { - "online_users": online_users, - "playing_users": playing_users, - "online_count": len(online_users), - "playing_count": len(playing_users), - }, - "system_status": { - "stats_system": "enhanced_interval_stats", - "data_alignment": "30_minute_boundaries", - "real_time_updates": True, - "auto_24h_fill": True, - }, - } - except Exception as e: - logger.error(f"Error getting debug info: {e}") - return {"error": "Failed to retrieve debug information", "message": str(e)} - - -async def _get_registered_users_count(redis) -> int: - """获取注册用户总数(从缓存)""" - try: - count = await redis.get(REDIS_REGISTERED_USERS_KEY) - return int(count) if count else 0 - except Exception as e: - logger.error(f"Error getting registered users count: {e}") - return 0 - - -async def _get_online_users_count(redis) -> int: - """获取当前在线用户数""" - try: - count = await redis.scard(REDIS_ONLINE_USERS_KEY) - return count - except Exception as e: - logger.error(f"Error getting online users count: {e}") - return 0 - - -async def _get_playing_users_count(redis) -> int: - """获取当前游玩用户数""" - try: - count = await redis.scard(REDIS_PLAYING_USERS_KEY) - return count - except Exception as e: - logger.error(f"Error getting playing users count: {e}") - return 0 - - -# 统计更新功能 -async def update_registered_users_count() -> None: - """更新注册用户数缓存""" - from app.const import BANCHOBOT_ID - from app.database import User - from app.dependencies.database import with_db - - from sqlmodel import func, select - - redis = get_redis() - try: - async with with_db() as db: - # 排除机器人用户(BANCHOBOT_ID) - result = await db.exec(select(func.count()).select_from(User).where(User.id != BANCHOBOT_ID)) - count = result.first() - await redis.set(REDIS_REGISTERED_USERS_KEY, count or 0, ex=300) # 5分钟过期 - logger.debug(f"Updated registered users count: {count}") - except Exception as e: - logger.error(f"Error updating registered users count: {e}") - - -async def add_online_user(user_id: int) -> None: - """添加在线用户""" - redis_sync = get_redis_message() - redis_async = get_redis() - try: - await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, str(user_id)) - # 检查key是否已有过期时间,如果没有则设置3小时过期 - ttl = await redis_async.ttl(REDIS_ONLINE_USERS_KEY) - if ttl <= 0: # -1表示永不过期,-2表示不存在,0表示已过期 - await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600) # 3小时过期 - logger.debug(f"Added online user {user_id}") - - # 立即更新当前区间统计 - from app.service.enhanced_interval_stats import update_user_activity_in_interval - - bg_tasks.add_task( - update_user_activity_in_interval, - user_id, - is_playing=False, - ) - - except Exception as e: - logger.error(f"Error adding online user {user_id}: {e}") - - -async def remove_online_user(user_id: int) -> None: - """移除在线用户""" - redis_sync = get_redis_message() - try: - await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, str(user_id)) - await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id)) - except Exception as e: - logger.error(f"Error removing online user {user_id}: {e}") - - -async def add_playing_user(user_id: int) -> None: - """添加游玩用户""" - redis_sync = get_redis_message() - redis_async = get_redis() - try: - await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, str(user_id)) - # 检查key是否已有过期时间,如果没有则设置3小时过期 - ttl = await redis_async.ttl(REDIS_PLAYING_USERS_KEY) - if ttl <= 0: # -1表示永不过期,-2表示不存在,0表示已过期 - await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600) # 3小时过期 - logger.debug(f"Added playing user {user_id}") - - # 立即更新当前区间统计 - from app.service.enhanced_interval_stats import update_user_activity_in_interval - - bg_tasks.add_task(update_user_activity_in_interval, user_id, is_playing=True) - - except Exception as e: - logger.error(f"Error adding playing user {user_id}: {e}") - - -async def remove_playing_user(user_id: int) -> None: - """移除游玩用户""" - redis_sync = get_redis_message() - try: - await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id)) - except Exception as e: - logger.error(f"Error removing playing user {user_id}: {e}") - - -async def record_hourly_stats() -> None: - """记录统计数据 - 简化版本,主要作为fallback使用""" - redis_sync = get_redis_message() - redis_async = get_redis() - try: - # 先确保Redis连接正常 - await redis_async.ping() - - online_count = await _get_online_users_count(redis_async) - playing_count = await _get_playing_users_count(redis_async) - - current_time = utcnow() - history_point = { - "timestamp": current_time.isoformat(), - "online_count": online_count, - "playing_count": playing_count, - } - - # 添加到历史记录 - await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) - # 只保留48个数据点(24小时,每30分钟一个点) - await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) - # 设置过期时间为26小时,确保有足够缓冲 - await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - - logger.info( - f"Recorded fallback stats: online={online_count}, playing={playing_count} " - f"at {current_time.strftime('%H:%M:%S')}" - ) - except Exception as e: - logger.error(f"Error recording fallback stats: {e}") diff --git a/app/service/enhanced_interval_stats.py b/app/service/enhanced_interval_stats.py deleted file mode 100644 index a63a662..0000000 --- a/app/service/enhanced_interval_stats.py +++ /dev/null @@ -1,468 +0,0 @@ -""" -重构的区间统计系统 - 真正统计半小时区间内的用户活跃情况 -""" - -from __future__ import annotations - -from dataclasses import dataclass -from datetime import UTC, datetime, timedelta -import json - -from app.dependencies.database import get_redis, get_redis_message -from app.log import logger -from app.router.private.stats import ( - REDIS_ONLINE_HISTORY_KEY, - _get_online_users_count, - _get_playing_users_count, - _redis_exec, -) -from app.utils import utcnow - -# Redis keys for interval statistics -INTERVAL_STATS_BASE_KEY = "server:interval_stats" -INTERVAL_ONLINE_USERS_KEY = "server:interval_online_users" # 区间内在线用户集合 -INTERVAL_PLAYING_USERS_KEY = "server:interval_playing_users" # 区间内游玩用户集合 -CURRENT_INTERVAL_INFO_KEY = "server:current_interval_info" # 当前区间信息 - - -@dataclass -class IntervalInfo: - """区间信息""" - - start_time: datetime - end_time: datetime - interval_key: str - - def is_current(self) -> bool: - """检查是否是当前区间""" - now = utcnow() - return self.start_time <= now < self.end_time - - def to_dict(self) -> dict: - return { - "start_time": self.start_time.isoformat(), - "end_time": self.end_time.isoformat(), - "interval_key": self.interval_key, - } - - @classmethod - def from_dict(cls, data: dict) -> "IntervalInfo": - return cls( - start_time=datetime.fromisoformat(data["start_time"]), - end_time=datetime.fromisoformat(data["end_time"]), - interval_key=data["interval_key"], - ) - - -@dataclass -class IntervalStats: - """区间统计数据""" - - interval_key: str - start_time: datetime - end_time: datetime - unique_online_users: int # 区间内独特在线用户数 - unique_playing_users: int # 区间内独特游玩用户数 - peak_online_count: int # 区间内在线用户数峰值 - peak_playing_count: int # 区间内游玩用户数峰值 - total_samples: int # 采样次数 - created_at: datetime - - def to_dict(self) -> dict: - return { - "interval_key": self.interval_key, - "start_time": self.start_time.isoformat(), - "end_time": self.end_time.isoformat(), - "unique_online_users": self.unique_online_users, - "unique_playing_users": self.unique_playing_users, - "peak_online_count": self.peak_online_count, - "peak_playing_count": self.peak_playing_count, - "total_samples": self.total_samples, - "created_at": self.created_at.isoformat(), - } - - @classmethod - def from_dict(cls, data: dict) -> "IntervalStats": - return cls( - interval_key=data["interval_key"], - start_time=datetime.fromisoformat(data["start_time"]), - end_time=datetime.fromisoformat(data["end_time"]), - unique_online_users=data["unique_online_users"], - unique_playing_users=data["unique_playing_users"], - peak_online_count=data["peak_online_count"], - peak_playing_count=data["peak_playing_count"], - total_samples=data["total_samples"], - created_at=datetime.fromisoformat(data["created_at"]), - ) - - -class EnhancedIntervalStatsManager: - """增强的区间统计管理器 - 真正统计半小时区间内的用户活跃情况""" - - @staticmethod - def get_current_interval_boundaries() -> tuple[datetime, datetime]: - """获取当前30分钟区间的边界""" - now = utcnow() - # 计算区间开始时间(向下取整到最近的30分钟) - minute = (now.minute // 30) * 30 - start_time = now.replace(minute=minute, second=0, microsecond=0) - # 区间结束时间 - end_time = start_time + timedelta(minutes=30) - return start_time, end_time - - @staticmethod - def generate_interval_key(start_time: datetime) -> str: - """生成区间唯一标识""" - return f"{INTERVAL_STATS_BASE_KEY}:{start_time.strftime('%Y%m%d_%H%M')}" - - @staticmethod - async def get_current_interval_info() -> IntervalInfo: - """获取当前区间信息""" - start_time, end_time = EnhancedIntervalStatsManager.get_current_interval_boundaries() - interval_key = EnhancedIntervalStatsManager.generate_interval_key(start_time) - - return IntervalInfo(start_time=start_time, end_time=end_time, interval_key=interval_key) - - @staticmethod - async def initialize_current_interval() -> None: - """初始化当前区间""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() - - # 存储当前区间信息 - await _redis_exec( - redis_sync.set, - CURRENT_INTERVAL_INFO_KEY, - json.dumps(current_interval.to_dict()), - ) - await redis_async.expire(CURRENT_INTERVAL_INFO_KEY, 35 * 60) # 35分钟过期 - - # 初始化区间用户集合(如果不存在) - online_key = f"{INTERVAL_ONLINE_USERS_KEY}:{current_interval.interval_key}" - playing_key = f"{INTERVAL_PLAYING_USERS_KEY}:{current_interval.interval_key}" - - # 设置过期时间为35分钟 - await redis_async.expire(online_key, 35 * 60) - await redis_async.expire(playing_key, 35 * 60) - - # 初始化区间统计记录 - stats = IntervalStats( - interval_key=current_interval.interval_key, - start_time=current_interval.start_time, - end_time=current_interval.end_time, - unique_online_users=0, - unique_playing_users=0, - peak_online_count=0, - peak_playing_count=0, - total_samples=0, - created_at=utcnow(), - ) - - await _redis_exec( - redis_sync.set, - current_interval.interval_key, - json.dumps(stats.to_dict()), - ) - await redis_async.expire(current_interval.interval_key, 35 * 60) - - # 如果历史记录为空,自动填充前24小时数据为0 - await EnhancedIntervalStatsManager._ensure_24h_history_exists() - - logger.info( - f"Initialized interval stats for {current_interval.start_time.strftime('%H:%M')}" - f" - {current_interval.end_time.strftime('%H:%M')}" - ) - - except Exception as e: - logger.error(f"Error initializing current interval: {e}") - - @staticmethod - async def _ensure_24h_history_exists() -> None: - """确保24小时历史数据存在,不存在则用0填充""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - # 检查现有历史数据数量 - history_length = await _redis_exec(redis_sync.llen, REDIS_ONLINE_HISTORY_KEY) - - if history_length < 48: # 少于48个数据点(24小时*2) - logger.info(f"History has only {history_length} points, filling with zeros for 24h") - - # 计算需要填充的数据点数量 - needed_points = 48 - history_length - - # 从当前时间往前推,创建缺失的时间点(都填充为0) - current_time = utcnow() # noqa: F841 - current_interval_start, _ = EnhancedIntervalStatsManager.get_current_interval_boundaries() - - # 从当前区间开始往前推,创建历史数据点(确保时间对齐到30分钟边界) - fill_points = [] - for i in range(needed_points): - # 每次往前推30分钟,确保时间对齐 - point_time = current_interval_start - timedelta(minutes=30 * (i + 1)) - - # 确保时间对齐到30分钟边界 - aligned_minute = (point_time.minute // 30) * 30 - point_time = point_time.replace(minute=aligned_minute, second=0, microsecond=0) - - history_point = { - "timestamp": point_time.isoformat(), - "online_count": 0, - "playing_count": 0, - } - fill_points.append(json.dumps(history_point)) - - # 将填充数据添加到历史记录末尾(最旧的数据) - if fill_points: - # 先将现有数据转移到临时位置 - temp_key = f"{REDIS_ONLINE_HISTORY_KEY}_temp" - if history_length > 0: - # 复制现有数据到临时key - existing_data = await _redis_exec(redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1) - if existing_data: - for data in existing_data: - await _redis_exec(redis_sync.rpush, temp_key, data) - - # 清空原有key - await redis_async.delete(REDIS_ONLINE_HISTORY_KEY) - - # 先添加填充数据(最旧的) - for point in reversed(fill_points): # 反向添加,最旧的在最后 - await _redis_exec(redis_sync.rpush, REDIS_ONLINE_HISTORY_KEY, point) - - # 再添加原有数据(较新的) - if history_length > 0: - existing_data = await _redis_exec(redis_sync.lrange, temp_key, 0, -1) - for data in existing_data: - await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, data) - - # 清理临时key - await redis_async.delete(temp_key) - - # 确保只保留48个数据点 - await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) - - # 设置过期时间 - await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - - logger.info(f"Filled {len(fill_points)} historical data points with zeros") - - except Exception as e: - logger.error(f"Error ensuring 24h history exists: {e}") - - @staticmethod - async def add_user_to_interval(user_id: int, is_playing: bool = False) -> None: - """添加用户到当前区间统计 - 实时更新当前运行的区间""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() - - # 添加到区间在线用户集合 - online_key = f"{INTERVAL_ONLINE_USERS_KEY}:{current_interval.interval_key}" - await _redis_exec(redis_sync.sadd, online_key, str(user_id)) - await redis_async.expire(online_key, 35 * 60) - - # 如果用户在游玩,也添加到游玩用户集合 - if is_playing: - playing_key = f"{INTERVAL_PLAYING_USERS_KEY}:{current_interval.interval_key}" - await _redis_exec(redis_sync.sadd, playing_key, str(user_id)) - await redis_async.expire(playing_key, 35 * 60) - - # 立即更新区间统计(同步更新,确保数据实时性) - await EnhancedIntervalStatsManager._update_interval_stats() - - logger.debug( - f"Added user {user_id} to current interval {current_interval.start_time.strftime('%H:%M')}" - f"-{current_interval.end_time.strftime('%H:%M')}" - ) - - except Exception as e: - logger.error(f"Error adding user {user_id} to interval: {e}") - - @staticmethod - async def _update_interval_stats() -> None: - """更新当前区间统计 - 立即同步更新""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() - - # 获取区间内独特用户数 - online_key = f"{INTERVAL_ONLINE_USERS_KEY}:{current_interval.interval_key}" - playing_key = f"{INTERVAL_PLAYING_USERS_KEY}:{current_interval.interval_key}" - - unique_online = await _redis_exec(redis_sync.scard, online_key) - unique_playing = await _redis_exec(redis_sync.scard, playing_key) - - # 获取当前实时用户数作为峰值参考 - current_online = await _get_online_users_count(redis_async) - current_playing = await _get_playing_users_count(redis_async) - - # 获取现有统计数据 - existing_data = await _redis_exec(redis_sync.get, current_interval.interval_key) - if existing_data: - stats = IntervalStats.from_dict(json.loads(existing_data)) - # 更新峰值 - stats.peak_online_count = max(stats.peak_online_count, current_online) - stats.peak_playing_count = max(stats.peak_playing_count, current_playing) - stats.total_samples += 1 - else: - # 创建新的统计记录 - stats = IntervalStats( - interval_key=current_interval.interval_key, - start_time=current_interval.start_time, - end_time=current_interval.end_time, - unique_online_users=0, - unique_playing_users=0, - peak_online_count=current_online, - peak_playing_count=current_playing, - total_samples=1, - created_at=utcnow(), - ) - - # 更新独特用户数 - stats.unique_online_users = unique_online - stats.unique_playing_users = unique_playing - - # 立即保存更新的统计数据 - await _redis_exec( - redis_sync.set, - current_interval.interval_key, - json.dumps(stats.to_dict()), - ) - await redis_async.expire(current_interval.interval_key, 35 * 60) - - logger.debug( - f"Updated interval stats: online={unique_online}, playing={unique_playing}, " - f"peak_online={stats.peak_online_count}, peak_playing={stats.peak_playing_count}" - ) - - except Exception as e: - logger.error(f"Error updating interval stats: {e}") - - @staticmethod - async def finalize_interval() -> IntervalStats | None: - """完成上一个已结束的区间统计并保存到历史""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - # 获取上一个已完成区间(当前区间的前一个) - current_start, current_end = EnhancedIntervalStatsManager.get_current_interval_boundaries() - # 上一个区间开始时间是当前区间开始时间减去30分钟 - previous_start = current_start - timedelta(minutes=30) - previous_end = current_start # 上一个区间的结束时间就是当前区间的开始时间 - - interval_key = EnhancedIntervalStatsManager.generate_interval_key(previous_start) - - previous_interval = IntervalInfo( - start_time=previous_start, - end_time=previous_end, - interval_key=interval_key, - ) - - # 获取最终统计数据 - stats_data = await _redis_exec(redis_sync.get, previous_interval.interval_key) - if not stats_data: - logger.warning( - f"No interval stats found to finalize for {previous_interval.start_time.strftime('%H:%M')}" - ) - return None - - stats = IntervalStats.from_dict(json.loads(stats_data)) - - # 创建历史记录点(使用区间开始时间作为时间戳) - history_point = { - "timestamp": previous_interval.start_time.isoformat(), - "online_count": stats.unique_online_users, - "playing_count": stats.unique_playing_users, - } - - # 添加到历史记录 - await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) - await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) - await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - - logger.info( - f"Finalized interval stats: " - f"unique_online={stats.unique_online_users}, " - f"unique_playing={stats.unique_playing_users}, " - f"peak_online={stats.peak_online_count}, " - f"peak_playing={stats.peak_playing_count}, " - f"samples={stats.total_samples} " - f"for {stats.start_time.strftime('%H:%M')}-{stats.end_time.strftime('%H:%M')}" - ) - - return stats - - except Exception as e: - logger.error(f"Error finalizing interval stats: {e}") - return None - - @staticmethod - async def get_current_interval_stats() -> IntervalStats | None: - """获取当前区间统计""" - redis_sync = get_redis_message() - - try: - current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() - stats_data = await _redis_exec(redis_sync.get, current_interval.interval_key) - - if stats_data: - return IntervalStats.from_dict(json.loads(stats_data)) - return None - - except Exception as e: - logger.error(f"Error getting current interval stats: {e}") - return None - - @staticmethod - async def cleanup_old_intervals() -> None: - """清理过期的区间数据""" - redis_async = get_redis() - - try: - # 删除过期的区间统计数据(超过2小时的) - cutoff_time = utcnow() - timedelta(hours=2) - pattern = f"{INTERVAL_STATS_BASE_KEY}:*" - - keys = await redis_async.keys(pattern) - for key in keys: - try: - # 从key中提取时间,处理字节或字符串类型 - if isinstance(key, bytes): - key_str = key.decode() - else: - key_str = key - time_part = key_str.split(":")[-1] # YYYYMMDD_HHMM格式 - # 将时区无关的datetime转换为UTC时区感知的datetime进行比较 - key_time = datetime.strptime(time_part, "%Y%m%d_%H%M").replace(tzinfo=UTC) - - if key_time < cutoff_time: - await redis_async.delete(key) - # 也删除对应的用户集合 - # 使用key_str确保正确拼接用户集合键 - await redis_async.delete(f"{INTERVAL_ONLINE_USERS_KEY}:{key_str}") - await redis_async.delete(f"{INTERVAL_PLAYING_USERS_KEY}:{key}") - - except (ValueError, IndexError): - # 忽略解析错误的key - continue - - logger.debug("Cleaned up old interval data") - - except Exception as e: - logger.error(f"Error cleaning up old intervals: {e}") - - -# 便捷函数,用于替换现有的统计更新函数 -async def update_user_activity_in_interval(user_id: int, is_playing: bool = False) -> None: - """用户活动时更新区间统计(在登录、开始游玩等时调用)""" - await EnhancedIntervalStatsManager.add_user_to_interval(user_id, is_playing) diff --git a/app/service/online_status_maintenance.py b/app/service/online_status_maintenance.py deleted file mode 100644 index c91f492..0000000 --- a/app/service/online_status_maintenance.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -在线状态维护服务 - -此模块提供在游玩状态下维护用户在线状态的功能, -解决游玩时显示离线的问题。 -""" - -from __future__ import annotations - -import asyncio - -from app.dependencies.database import get_redis -from app.log import logger -from app.router.private.stats import REDIS_PLAYING_USERS_KEY, _redis_exec, get_redis_message - - -async def maintain_playing_users_online_status(): - """ - 维护正在游玩用户的在线状态 - - 定期刷新正在游玩用户的metadata在线标记, - 确保他们在游玩过程中显示为在线状态。 - """ - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - # 获取所有正在游玩的用户 - playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY) - - if not playing_users: - return - - logger.debug(f"Maintaining online status for {len(playing_users)} playing users") - - # 为每个游玩用户刷新metadata在线标记 - for user_id in playing_users: - user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) - metadata_key = f"metadata:online:{user_id_str}" - - # 设置或刷新metadata在线标记,过期时间为1小时 - await redis_async.set(metadata_key, "playing", ex=3600) - - logger.debug(f"Updated metadata online status for {len(playing_users)} playing users") - - except Exception as e: - logger.error(f"Error maintaining playing users online status: {e}") - - -async def start_online_status_maintenance_task(): - """ - 启动在线状态维护任务 - - 每5分钟运行一次维护任务,确保游玩用户保持在线状态 - """ - logger.info("Starting online status maintenance task") - - while True: - try: - await maintain_playing_users_online_status() - # 每5分钟运行一次 - await asyncio.sleep(300) - except Exception as e: - logger.error(f"Error in online status maintenance task: {e}") - # 出错后等待30秒再重试 - await asyncio.sleep(30) - - -def schedule_online_status_maintenance(): - """ - 调度在线状态维护任务 - """ - task = asyncio.create_task(start_online_status_maintenance_task()) - return task diff --git a/app/service/online_status_manager.py b/app/service/online_status_manager.py deleted file mode 100644 index 8200c43..0000000 --- a/app/service/online_status_manager.py +++ /dev/null @@ -1,136 +0,0 @@ -""" -在线状态管理服务 - -此模块负责统一管理用户的在线状态,确保用户在连接WebSocket后立即显示为在线。 -""" - -from __future__ import annotations - -from app.dependencies.database import get_redis -from app.log import logger -from app.router.private.stats import add_online_user -from app.utils import utcnow - - -class OnlineStatusManager: - """在线状态管理器""" - - @staticmethod - async def set_user_online(user_id: int, hub_type: str = "general") -> None: - """ - 设置用户为在线状态 - - Args: - user_id: 用户ID - hub_type: Hub类型 (metadata, spectator, multiplayer等) - """ - try: - redis = get_redis() - - # 1. 添加到在线用户集合 - await add_online_user(user_id) - - # 2. 设置metadata在线标记,这是is_online检查的关键 - metadata_key = f"metadata:online:{user_id}" - await redis.set(metadata_key, hub_type, ex=7200) # 2小时过期 - - # 3. 设置最后活跃时间戳 - last_seen_key = f"user:last_seen:{user_id}" - await redis.set(last_seen_key, int(utcnow().timestamp()), ex=7200) - - logger.debug(f"[OnlineStatusManager] User {user_id} set online via {hub_type}") - - except Exception as e: - logger.error(f"[OnlineStatusManager] Error setting user {user_id} online: {e}") - - @staticmethod - async def refresh_user_online_status(user_id: int, hub_type: str = "active") -> None: - """ - 刷新用户的在线状态 - - Args: - user_id: 用户ID - hub_type: 当前活动类型 - """ - try: - redis = get_redis() - - # 刷新metadata在线标记 - metadata_key = f"metadata:online:{user_id}" - await redis.set(metadata_key, hub_type, ex=7200) - - # 刷新最后活跃时间 - last_seen_key = f"user:last_seen:{user_id}" - await redis.set(last_seen_key, int(utcnow().timestamp()), ex=7200) - - logger.debug(f"[OnlineStatusManager] Refreshed online status for user {user_id}") - - except Exception as e: - logger.error(f"[OnlineStatusManager] Error refreshing user {user_id} status: {e}") - - @staticmethod - async def set_user_offline(user_id: int) -> None: - """ - 设置用户为离线状态 - - Args: - user_id: 用户ID - """ - try: - redis = get_redis() - - # 删除metadata在线标记 - metadata_key = f"metadata:online:{user_id}" - await redis.delete(metadata_key) - - # 从在线用户集合中移除 - from app.router.private.stats import remove_online_user - - await remove_online_user(user_id) - - logger.debug(f"[OnlineStatusManager] User {user_id} set offline") - - except Exception as e: - logger.error(f"[OnlineStatusManager] Error setting user {user_id} offline: {e}") - - @staticmethod - async def is_user_online(user_id: int) -> bool: - """ - 检查用户是否在线 - - Args: - user_id: 用户ID - - Returns: - bool: 用户是否在线 - """ - try: - redis = get_redis() - metadata_key = f"metadata:online:{user_id}" - is_online = await redis.exists(metadata_key) - return bool(is_online) - except Exception as e: - logger.error(f"[OnlineStatusManager] Error checking user {user_id} online status: {e}") - return False - - @staticmethod - async def get_online_users_count() -> int: - """ - 获取在线用户数量 - - Returns: - int: 在线用户数量 - """ - try: - from app.dependencies.database import get_redis - from app.router.private.stats import _get_online_users_count - - redis = get_redis() - return await _get_online_users_count(redis) - except Exception as e: - logger.error(f"[OnlineStatusManager] Error getting online users count: {e}") - return 0 - - -# 单例实例 -online_status_manager = OnlineStatusManager() diff --git a/app/service/stats_cleanup.py b/app/service/stats_cleanup.py deleted file mode 100644 index dab46e1..0000000 --- a/app/service/stats_cleanup.py +++ /dev/null @@ -1,90 +0,0 @@ -from __future__ import annotations - -from datetime import timedelta - -from app.dependencies.database import get_redis, get_redis_message -from app.log import logger -from app.router.private.stats import ( - REDIS_ONLINE_USERS_KEY, - REDIS_PLAYING_USERS_KEY, - _redis_exec, -) -from app.utils import utcnow - - -async def cleanup_stale_online_users() -> tuple[int, int]: - """清理过期的在线和游玩用户,返回清理的用户数""" - redis_sync = get_redis_message() - redis_async = get_redis() - - online_cleaned = 0 - playing_cleaned = 0 - - try: - # 获取所有在线用户 - online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY) - playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY) - - # 检查在线用户的最后活动时间 - current_time = utcnow() - stale_threshold = current_time - timedelta(hours=2) # 2小时无活动视为过期 # noqa: F841 - - # 对于在线用户,我们检查metadata在线标记 - stale_online_users = [] - for user_id in online_users: - user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) - metadata_key = f"metadata:online:{user_id_str}" - - # 如果metadata标记不存在,说明用户已经离线 - if not await redis_async.exists(metadata_key): - stale_online_users.append(user_id_str) - - # 清理过期的在线用户 - if stale_online_users: - await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, *stale_online_users) - online_cleaned = len(stale_online_users) - logger.info(f"Cleaned {online_cleaned} stale online users") - - # 对于游玩用户,我们使用更保守的清理策略 - # 只有当用户明确不在任何hub连接中时才移除 - stale_playing_users = [] - for user_id in playing_users: - user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) - metadata_key = f"metadata:online:{user_id_str}" - - # 只有当metadata在线标记完全不存在且用户也不在在线列表中时, - # 才认为用户真正离线 - if not await redis_async.exists(metadata_key) and user_id_str not in [ - u.decode() if isinstance(u, bytes) else str(u) for u in online_users - ]: - stale_playing_users.append(user_id_str) - - # 清理过期的游玩用户 - if stale_playing_users: - await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, *stale_playing_users) - playing_cleaned = len(stale_playing_users) - logger.info(f"Cleaned {playing_cleaned} stale playing users") - - except Exception as e: - logger.error(f"Error cleaning stale users: {e}") - - return online_cleaned, playing_cleaned - - -async def refresh_redis_key_expiry() -> None: - """刷新Redis键的过期时间,防止数据丢失""" - redis_async = get_redis() - - try: - # 刷新在线用户key的过期时间 - if await redis_async.exists(REDIS_ONLINE_USERS_KEY): - await redis_async.expire(REDIS_ONLINE_USERS_KEY, 6 * 3600) # 6小时 - - # 刷新游玩用户key的过期时间 - if await redis_async.exists(REDIS_PLAYING_USERS_KEY): - await redis_async.expire(REDIS_PLAYING_USERS_KEY, 6 * 3600) # 6小时 - - logger.debug("Refreshed Redis key expiry times") - - except Exception as e: - logger.error(f"Error refreshing Redis key expiry: {e}") diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py deleted file mode 100644 index 2ca3ea0..0000000 --- a/app/service/stats_scheduler.py +++ /dev/null @@ -1,186 +0,0 @@ -from __future__ import annotations - -import asyncio -from datetime import timedelta - -from app.log import logger -from app.router.private.stats import record_hourly_stats, update_registered_users_count -from app.service.enhanced_interval_stats import EnhancedIntervalStatsManager -from app.service.stats_cleanup import ( - cleanup_stale_online_users, - refresh_redis_key_expiry, -) -from app.utils import utcnow - - -class StatsScheduler: - """统计数据调度器""" - - def __init__(self): - self._running = False - self._stats_task: asyncio.Task | None = None - self._registered_task: asyncio.Task | None = None - self._cleanup_task: asyncio.Task | None = None - - def start(self) -> None: - """启动调度器""" - if self._running: - return - - self._running = True - self._stats_task = asyncio.create_task(self._stats_loop()) - self._registered_task = asyncio.create_task(self._registered_users_loop()) - self._cleanup_task = asyncio.create_task(self._cleanup_loop()) - logger.info("Stats scheduler started") - - def stop(self) -> None: - """停止调度器""" - if not self._running: - return - - self._running = False - - if self._stats_task: - self._stats_task.cancel() - if self._registered_task: - self._registered_task.cancel() - if self._cleanup_task: - self._cleanup_task.cancel() - - logger.info("Stats scheduler stopped") - - async def _stats_loop(self) -> None: - """统计数据记录循环 - 每30分钟记录一次""" - # 启动时立即记录一次统计数据 - try: - await EnhancedIntervalStatsManager.initialize_current_interval() - logger.info("Initial enhanced interval statistics initialized on startup") - except Exception as e: - logger.error(f"Error initializing enhanced interval stats: {e}") - - while self._running: - try: - # 计算下次区间结束时间 - now = utcnow() - - # 计算当前区间的结束时间 - current_minute = (now.minute // 30) * 30 - current_interval_end = now.replace(minute=current_minute, second=0, microsecond=0) + timedelta( - minutes=30 - ) - - # 如果当前时间已经超过了当前区间结束时间,说明需要等待下一个区间结束 - if now >= current_interval_end: - current_interval_end += timedelta(minutes=30) - - # 计算需要等待的时间 - sleep_seconds = (current_interval_end - now).total_seconds() - - # 添加小的缓冲时间,确保区间真正结束后再处理 - sleep_seconds += 10 # 额外等待10秒 - - # 限制等待时间范围 - sleep_seconds = max(min(sleep_seconds, 32 * 60), 10) - - logger.debug( - f"Next interval finalization in {sleep_seconds / 60:.1f} " - f"minutes at {current_interval_end.strftime('%H:%M:%S')}" - ) - await asyncio.sleep(sleep_seconds) - - if not self._running: - break - - # 完成当前区间并记录到历史 - finalized_stats = await EnhancedIntervalStatsManager.finalize_interval() - if finalized_stats: - logger.info(f"Finalized enhanced interval statistics at {utcnow().strftime('%Y-%m-%d %H:%M:%S')}") - else: - # 如果区间完成失败,使用原有方式记录 - await record_hourly_stats() - logger.info(f"Recorded hourly statistics (fallback) at {utcnow().strftime('%Y-%m-%d %H:%M:%S')}") - - # 开始新的区间统计 - await EnhancedIntervalStatsManager.initialize_current_interval() - - except Exception as e: - logger.error(f"Error in stats loop: {e}") - # 出错时等待5分钟再重试 - await asyncio.sleep(5 * 60) - - async def _registered_users_loop(self) -> None: - """注册用户数更新循环 - 每5分钟更新一次""" - # 启动时立即更新一次注册用户数 - try: - await update_registered_users_count() - logger.info("Initial registered users count updated on startup") - except Exception as e: - logger.error(f"Error updating initial registered users count: {e}") - - while self._running: - # 等待5分钟 - await asyncio.sleep(5 * 60) - - if not self._running: - break - - try: - await update_registered_users_count() - logger.debug("Updated registered users count") - except Exception as e: - logger.error(f"Error in registered users loop: {e}") - - async def _cleanup_loop(self) -> None: - """清理循环 - 每10分钟清理一次过期用户""" - # 启动时立即执行一次清理 - try: - online_cleaned, playing_cleaned = await cleanup_stale_online_users() - if online_cleaned > 0 or playing_cleaned > 0: - logger.info( - f"Initial cleanup: removed {online_cleaned} stale online users," - f" {playing_cleaned} stale playing users" - ) - - await refresh_redis_key_expiry() - except Exception as e: - logger.error(f"Error in initial cleanup: {e}") - - while self._running: - # 等待10分钟 - await asyncio.sleep(10 * 60) - - if not self._running: - break - - try: - # 清理过期用户 - online_cleaned, playing_cleaned = await cleanup_stale_online_users() - if online_cleaned > 0 or playing_cleaned > 0: - logger.info( - f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users" - ) - - # 刷新Redis key过期时间 - await refresh_redis_key_expiry() - - # 清理过期的区间数据 - await EnhancedIntervalStatsManager.cleanup_old_intervals() - - except Exception as e: - logger.error(f"Error in cleanup loop: {e}") - # 出错时等待2分钟再重试 - await asyncio.sleep(2 * 60) - - -# 全局调度器实例 -stats_scheduler = StatsScheduler() - - -def start_stats_scheduler() -> None: - """启动统计调度器""" - stats_scheduler.start() - - -def stop_stats_scheduler() -> None: - """停止统计调度器""" - stats_scheduler.stop() diff --git a/app/signalr/hub/metadata.py b/app/signalr/hub/metadata.py index 19215cb..1052be5 100644 --- a/app/signalr/hub/metadata.py +++ b/app/signalr/hub/metadata.py @@ -87,11 +87,6 @@ class MetadataHub(Hub[MetadataClientState]): async def _clean_state(self, state: MetadataClientState) -> None: user_id = int(state.connection_id) - # Use centralized offline status management - from app.service.online_status_manager import online_status_manager - - await online_status_manager.set_user_offline(user_id) - if state.pushable: await asyncio.gather(*self.broadcast_tasks(user_id, None)) @@ -112,11 +107,6 @@ class MetadataHub(Hub[MetadataClientState]): user_id = int(client.connection_id) store = self.get_or_create_state(client) - # Use centralized online status management - from app.service.online_status_manager import online_status_manager - - await online_status_manager.set_user_online(user_id, "metadata") - # CRITICAL FIX: Set online status IMMEDIATELY upon connection # This matches the C# official implementation behavior store.status = OnlineStatus.ONLINE diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index ea23fb8..e294984 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -163,11 +163,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): async def _clean_state(self, state: MultiplayerClientState): user_id = int(state.connection_id) - # Use centralized offline status management - from app.service.online_status_manager import online_status_manager - - await online_status_manager.set_user_offline(user_id) - if state.room_id != 0 and state.room_id in self.rooms: server_room = self.rooms[state.room_id] room = server_room.room @@ -179,11 +174,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): """Track online users when connecting to multiplayer hub""" logger.info(f"[MultiplayerHub] Client {client.user_id} connected") - # Use centralized online status management - from app.service.online_status_manager import online_status_manager - - await online_status_manager.set_user_online(client.user_id, "multiplayer") - def _ensure_in_room(self, client: Client) -> ServerMultiplayerRoom: store = self.get_or_create_state(client) if store.room_id == 0: diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index e278adc..14692f3 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -31,7 +31,7 @@ from app.models.spectator_hub import ( StoreClientState, StoreScore, ) -from app.utils import bg_tasks, unix_timestamp_to_windows +from app.utils import unix_timestamp_to_windows from .hub import Client, Hub @@ -160,12 +160,6 @@ class SpectatorHub(Hub[StoreClientState]): Properly notifies watched users when spectator disconnects. """ user_id = int(state.connection_id) - - # Use centralized offline status management - from app.service.online_status_manager import online_status_manager - - await online_status_manager.set_user_offline(user_id) - if state.state: await self._end_session(user_id, state.state, state) @@ -183,11 +177,6 @@ class SpectatorHub(Hub[StoreClientState]): """ logger.info(f"[SpectatorHub] Client {client.user_id} connected") - # Use centralized online status management - from app.service.online_status_manager import online_status_manager - - await online_status_manager.set_user_online(client.user_id, "spectator") - # Send all current player states to the new client # This matches the official OnConnectedAsync behavior active_states = [] @@ -281,9 +270,6 @@ class SpectatorHub(Hub[StoreClientState]): logger.warning(f"[SpectatorHub] User {user_id} began new session without ending previous one; cleaning up") try: await self._end_session(user_id, store.state, store) - from app.router.private.stats import remove_playing_user - - bg_tasks.add_task(remove_playing_user, user_id) finally: store.state = None store.beatmap_status = None @@ -320,19 +306,6 @@ class SpectatorHub(Hub[StoreClientState]): ) logger.info(f"[SpectatorHub] {client.user_id} began playing {state.beatmap_id}") - # Track playing user and maintain online status - from app.router.private.stats import add_playing_user - from app.service.online_status_manager import online_status_manager - - bg_tasks.add_task(add_playing_user, user_id) - - # Critical fix: Maintain metadata online presence during gameplay - # This ensures the user appears online while playing - await online_status_manager.refresh_user_online_status(user_id, "playing") - - # # 预缓存beatmap文件以加速后续PP计算 - # await self._preload_beatmap_for_pp_calculation(state.beatmap_id) - await self.broadcast_group_call( self.group_id(user_id), "UserBeganPlaying", @@ -346,12 +319,6 @@ class SpectatorHub(Hub[StoreClientState]): if store.state is None or store.score is None: return - # Critical fix: Refresh online status during active gameplay - # This prevents users from appearing offline while playing - from app.service.online_status_manager import online_status_manager - - await online_status_manager.refresh_user_online_status(user_id, "playing_active") - header = frame_data.header score_info = store.score.score_info score_info.accuracy = header.accuracy @@ -387,11 +354,6 @@ class SpectatorHub(Hub[StoreClientState]): # End the play session and notify watchers await self._end_session(user_id, state, store) - # Remove from playing user tracking - from app.router.private.stats import remove_playing_user - - bg_tasks.add_task(remove_playing_user, user_id) - finally: # CRITICAL FIX: Always clear state in finally block to ensure cleanup # This matches the official C# implementation pattern diff --git a/app/signalr/router.py b/app/signalr/router.py index 55b08fc..9fa316c 100644 --- a/app/signalr/router.py +++ b/app/signalr/router.py @@ -9,6 +9,7 @@ import uuid from app.database import User as DBUser from app.dependencies import get_current_user from app.dependencies.database import DBFactory, get_db_factory +from app.log import logger from app.models.signalr import NegotiateResponse, Transport from .hub import Hubs @@ -18,6 +19,10 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Query, WebSocket from fastapi.security import SecurityScopes router = APIRouter(prefix="/signalr", include_in_schema=False) +logger.warning( + "The Python version of SignalR server is deprecated. " + "Maybe it will be removed or be fixed to continuously use in the future" +) @router.post("/{hub}/negotiate", response_model=NegotiateResponse) diff --git a/main.py b/main.py index 276168e..d4e12b7 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,6 @@ from app.router import ( lio_router, private_router, redirect_api_router, - signalr_router, ) from app.router.redirect import redirect_router from app.scheduler.cache_scheduler import start_cache_scheduler, stop_cache_scheduler @@ -34,10 +33,8 @@ from app.service.email_queue import start_email_processor, stop_email_processor from app.service.geoip_scheduler import schedule_geoip_updates from app.service.init_geoip import init_geoip from app.service.load_achievements import load_achievements -from app.service.online_status_maintenance import schedule_online_status_maintenance from app.service.osu_rx_statistics import create_rx_statistics from app.service.redis_message_system import redis_message_system -from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler from app.utils import bg_tasks, utcnow from fastapi import FastAPI, HTTPException, Request @@ -64,8 +61,6 @@ async def lifespan(app: FastAPI): await start_cache_scheduler() # 启动缓存调度器 await start_database_cleanup_scheduler() # 启动数据库清理调度器 redis_message_system.start() # 启动 Redis 消息系统 - start_stats_scheduler() # 启动统计调度器 - schedule_online_status_maintenance() # 启动在线状态维护任务 load_achievements() # 显示资源代理状态 @@ -77,7 +72,6 @@ async def lifespan(app: FastAPI): bg_tasks.stop() stop_scheduler() redis_message_system.stop() # 停止 Redis 消息系统 - stop_stats_scheduler() # 停止统计调度器 await stop_cache_scheduler() # 停止缓存调度器 await stop_database_cleanup_scheduler() # 停止数据库清理调度器 await download_service.stop_health_check() # 停止下载服务健康检查 @@ -127,13 +121,15 @@ app.include_router(api_v2_router) app.include_router(api_v1_router) app.include_router(chat_router) app.include_router(redirect_api_router) -app.include_router(signalr_router) app.include_router(fetcher_router) app.include_router(file_router) app.include_router(auth_router) app.include_router(private_router) app.include_router(lio_router) +# from app.signalr import signalr_router +# app.include_router(signalr_router) + # CORS 配置 origins = [] for url in [*settings.cors_urls, settings.server_url]: