From eedc23fa7fc859aecd9323434207267bdeb2c060 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 10:17:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=9C=A8=E7=BA=BF=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/service/online_status_maintenance.py | 74 ++++++++++++ app/service/online_status_manager.py | 136 +++++++++++++++++++++++ app/service/stats_cleanup.py | 14 ++- app/signalr/hub/metadata.py | 48 +++++--- app/signalr/hub/multiplayer.py | 14 +-- app/signalr/hub/spectator.py | 53 +++++---- main.py | 2 + 7 files changed, 290 insertions(+), 51 deletions(-) create mode 100644 app/service/online_status_maintenance.py create mode 100644 app/service/online_status_manager.py diff --git a/app/service/online_status_maintenance.py b/app/service/online_status_maintenance.py new file mode 100644 index 0000000..e7a1b05 --- /dev/null +++ b/app/service/online_status_maintenance.py @@ -0,0 +1,74 @@ +""" +在线状态维护服务 + +此模块提供在游玩状态下维护用户在线状态的功能, +解决游玩时显示离线的问题。 +""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta + +from app.dependencies.database import get_redis +from app.log import logger +from app.router.v2.stats import REDIS_PLAYING_USERS_KEY, _redis_exec, get_redis_message + + +async def maintain_playing_users_online_status(): + """ + 维护正在游玩用户的在线状态 + + 定期刷新正在游玩用户的metadata在线标记, + 确保他们在游玩过程中显示为在线状态。 + """ + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + # 获取所有正在游玩的用户 + playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY) + + if not playing_users: + return + + logger.debug(f"Maintaining online status for {len(playing_users)} playing users") + + # 为每个游玩用户刷新metadata在线标记 + for user_id in playing_users: + user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) + metadata_key = f"metadata:online:{user_id_str}" + + # 设置或刷新metadata在线标记,过期时间为1小时 + await redis_async.set(metadata_key, "playing", ex=3600) + + logger.debug(f"Updated metadata online status for {len(playing_users)} playing users") + + except Exception as e: + logger.error(f"Error maintaining playing users online status: {e}") + + +async def start_online_status_maintenance_task(): + """ + 启动在线状态维护任务 + + 每5分钟运行一次维护任务,确保游玩用户保持在线状态 + """ + logger.info("Starting online status maintenance task") + + while True: + try: + await maintain_playing_users_online_status() + # 每5分钟运行一次 + await asyncio.sleep(300) + except Exception as e: + logger.error(f"Error in online status maintenance task: {e}") + # 出错后等待30秒再重试 + await asyncio.sleep(30) + + +def schedule_online_status_maintenance(): + """ + 调度在线状态维护任务 + """ + task = asyncio.create_task(start_online_status_maintenance_task()) + return task diff --git a/app/service/online_status_manager.py b/app/service/online_status_manager.py new file mode 100644 index 0000000..b4ef91e --- /dev/null +++ b/app/service/online_status_manager.py @@ -0,0 +1,136 @@ +""" +在线状态管理服务 + +此模块负责统一管理用户的在线状态,确保用户在连接WebSocket后立即显示为在线。 +""" +from __future__ import annotations + +import asyncio +from datetime import datetime + +from app.dependencies.database import get_redis +from app.log import logger +from app.router.v2.stats import add_online_user + + +class OnlineStatusManager: + """在线状态管理器""" + + @staticmethod + async def set_user_online(user_id: int, hub_type: str = "general") -> None: + """ + 设置用户为在线状态 + + Args: + user_id: 用户ID + hub_type: Hub类型 (metadata, spectator, multiplayer等) + """ + try: + redis = get_redis() + + # 1. 添加到在线用户集合 + await add_online_user(user_id) + + # 2. 设置metadata在线标记,这是is_online检查的关键 + metadata_key = f"metadata:online:{user_id}" + await redis.set(metadata_key, hub_type, ex=7200) # 2小时过期 + + # 3. 设置最后活跃时间戳 + last_seen_key = f"user:last_seen:{user_id}" + await redis.set(last_seen_key, int(datetime.utcnow().timestamp()), ex=7200) + + logger.debug(f"[OnlineStatusManager] User {user_id} set online via {hub_type}") + + except Exception as e: + logger.error(f"[OnlineStatusManager] Error setting user {user_id} online: {e}") + + @staticmethod + async def refresh_user_online_status(user_id: int, hub_type: str = "active") -> None: + """ + 刷新用户的在线状态 + + Args: + user_id: 用户ID + hub_type: 当前活动类型 + """ + try: + redis = get_redis() + + # 刷新metadata在线标记 + metadata_key = f"metadata:online:{user_id}" + await redis.set(metadata_key, hub_type, ex=7200) + + # 刷新最后活跃时间 + last_seen_key = f"user:last_seen:{user_id}" + await redis.set(last_seen_key, int(datetime.utcnow().timestamp()), ex=7200) + + logger.debug(f"[OnlineStatusManager] Refreshed online status for user {user_id}") + + except Exception as e: + logger.error(f"[OnlineStatusManager] Error refreshing user {user_id} status: {e}") + + @staticmethod + async def set_user_offline(user_id: int) -> None: + """ + 设置用户为离线状态 + + Args: + user_id: 用户ID + """ + try: + redis = get_redis() + + # 删除metadata在线标记 + metadata_key = f"metadata:online:{user_id}" + await redis.delete(metadata_key) + + # 从在线用户集合中移除 + from app.router.v2.stats import remove_online_user + await remove_online_user(user_id) + + logger.debug(f"[OnlineStatusManager] User {user_id} set offline") + + except Exception as e: + logger.error(f"[OnlineStatusManager] Error setting user {user_id} offline: {e}") + + @staticmethod + async def is_user_online(user_id: int) -> bool: + """ + 检查用户是否在线 + + Args: + user_id: 用户ID + + Returns: + bool: 用户是否在线 + """ + try: + redis = get_redis() + metadata_key = f"metadata:online:{user_id}" + is_online = await redis.exists(metadata_key) + return bool(is_online) + except Exception as e: + logger.error(f"[OnlineStatusManager] Error checking user {user_id} online status: {e}") + return False + + @staticmethod + async def get_online_users_count() -> int: + """ + 获取在线用户数量 + + Returns: + int: 在线用户数量 + """ + try: + from app.router.v2.stats import _get_online_users_count + from app.dependencies.database import get_redis + + redis = get_redis() + return await _get_online_users_count(redis) + except Exception as e: + logger.error(f"[OnlineStatusManager] Error getting online users count: {e}") + return 0 + + +# 单例实例 +online_status_manager = OnlineStatusManager() diff --git a/app/service/stats_cleanup.py b/app/service/stats_cleanup.py index a3856a7..a7c57d0 100644 --- a/app/service/stats_cleanup.py +++ b/app/service/stats_cleanup.py @@ -48,17 +48,19 @@ async def cleanup_stale_online_users() -> tuple[int, int]: online_cleaned = len(stale_online_users) logger.info(f"Cleaned {online_cleaned} stale online users") - # 对于游玩用户,我们也检查对应的spectator状态 + # 对于游玩用户,我们使用更保守的清理策略 + # 只有当用户明确不在任何hub连接中时才移除 stale_playing_users = [] for user_id in playing_users: user_id_str = ( user_id.decode() if isinstance(user_id, bytes) else str(user_id) ) - - # 如果用户不在在线用户列表中,说明已经离线,也应该从游玩列表中移除 - if user_id_str in stale_online_users or user_id_str not in [ - u.decode() if isinstance(u, bytes) else str(u) for u in online_users - ]: + metadata_key = f"metadata:online:{user_id_str}" + + # 只有当metadata在线标记完全不存在且用户也不在在线列表中时, + # 才认为用户真正离线 + if (not await redis_async.exists(metadata_key) and + user_id_str not in [u.decode() if isinstance(u, bytes) else str(u) for u in online_users]): stale_playing_users.append(user_id_str) # 清理过期的游玩用户 diff --git a/app/signalr/hub/metadata.py b/app/signalr/hub/metadata.py index 0a753f1..757e9c8 100644 --- a/app/signalr/hub/metadata.py +++ b/app/signalr/hub/metadata.py @@ -14,6 +14,7 @@ from app.database.playlists import Playlist from app.database.room import Room from app.database.score import Score from app.dependencies.database import get_redis, with_db +from app.log import logger from app.models.metadata_hub import ( TOTAL_SCORE_DISTRIBUTION_BINS, DailyChallengeInfo, @@ -93,16 +94,13 @@ class MetadataHub(Hub[MetadataClientState]): async def _clean_state(self, state: MetadataClientState) -> None: user_id = int(state.connection_id) - # Remove from online user tracking - from app.router.v2.stats import remove_online_user - - asyncio.create_task(remove_online_user(user_id)) + # Use centralized offline status management + from app.service.online_status_manager import online_status_manager + await online_status_manager.set_user_offline(user_id) if state.pushable: await asyncio.gather(*self.broadcast_tasks(user_id, None)) - redis = get_redis() - if await redis.exists(f"metadata:online:{state.connection_id}"): - await redis.delete(f"metadata:online:{state.connection_id}") + async with with_db() as session: async with session.begin(): user = ( @@ -122,12 +120,16 @@ class MetadataHub(Hub[MetadataClientState]): async def on_client_connect(self, client: Client) -> None: user_id = int(client.connection_id) - self.get_or_create_state(client) + store = self.get_or_create_state(client) - # Track online user - from app.router.v2.stats import add_online_user + # Use centralized online status management + from app.service.online_status_manager import online_status_manager + await online_status_manager.set_user_online(user_id, "metadata") - asyncio.create_task(add_online_user(user_id)) + # CRITICAL FIX: Set online status IMMEDIATELY upon connection + # This matches the C# official implementation behavior + store.status = OnlineStatus.ONLINE + logger.info(f"[MetadataHub] Set user {user_id} status to ONLINE upon connection") async with with_db() as session: async with session.begin(): @@ -175,8 +177,23 @@ class MetadataHub(Hub[MetadataClientState]): room_id=daily_challenge_room.id, ), ) - redis = get_redis() - await redis.set(f"metadata:online:{user_id}", "") + + # CRITICAL FIX: Immediately broadcast the user's online status to all watchers + # This ensures the user appears as "currently online" right after connection + # Similar to the C# implementation's immediate broadcast logic + online_presence_tasks = self.broadcast_tasks(user_id, store) + if online_presence_tasks: + await asyncio.gather(*online_presence_tasks) + logger.info(f"[MetadataHub] Broadcasted online status for user {user_id} to watchers") + + # Also send the user's own presence update to confirm online status + await self.call_noblock( + client, + "UserPresenceUpdated", + user_id, + store.for_push, + ) + logger.info(f"[MetadataHub] User {user_id} is now ONLINE and visible to other clients") async def UpdateStatus(self, client: Client, status: int) -> None: status_ = OnlineStatus(status) @@ -214,19 +231,22 @@ class MetadataHub(Hub[MetadataClientState]): await asyncio.gather(*tasks) async def BeginWatchingUserPresence(self, client: Client) -> None: + # Critical fix: Send all currently online users to the new watcher + # Must use for_push to get the correct UserPresence format await asyncio.gather( *[ self.call_noblock( client, "UserPresenceUpdated", user_id, - store, + store.for_push, # Fixed: use for_push instead of store ) for user_id, store in self.state.items() if store.pushable ] ) self.add_to_group(client, self.online_presence_watchers_group()) + logger.info(f"[MetadataHub] Client {client.connection_id} now watching user presence, sent {len([s for s in self.state.values() if s.pushable])} online users") async def EndWatchingUserPresence(self, client: Client) -> None: self.remove_from_group(client, self.online_presence_watchers_group()) diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 2b43230..2ea759a 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -164,10 +164,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]): async def _clean_state(self, state: MultiplayerClientState): user_id = int(state.connection_id) - # Remove from online user tracking - from app.router.v2.stats import remove_online_user - - asyncio.create_task(remove_online_user(user_id)) + # Use centralized offline status management + from app.service.online_status_manager import online_status_manager + await online_status_manager.set_user_offline(user_id) if state.room_id != 0 and state.room_id in self.rooms: server_room = self.rooms[state.room_id] @@ -182,10 +181,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]): """Track online users when connecting to multiplayer hub""" logger.info(f"[MultiplayerHub] Client {client.user_id} connected") - # Track online user - from app.router.v2.stats import add_online_user - - asyncio.create_task(add_online_user(client.user_id)) + # Use centralized online status management + from app.service.online_status_manager import online_status_manager + await online_status_manager.set_user_online(client.user_id, "multiplayer") def _ensure_in_room(self, client: Client) -> ServerMultiplayerRoom: store = self.get_or_create_state(client) diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index 79876e5..a17c214 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -171,10 +171,9 @@ class SpectatorHub(Hub[StoreClientState]): """ user_id = int(state.connection_id) - # Remove from online and playing tracking - from app.router.v2.stats import remove_online_user - - asyncio.create_task(remove_online_user(user_id)) + # Use centralized offline status management + from app.service.online_status_manager import online_status_manager + await online_status_manager.set_user_offline(user_id) if state.state: await self._end_session(user_id, state.state, state) @@ -197,10 +196,9 @@ class SpectatorHub(Hub[StoreClientState]): """ logger.info(f"[SpectatorHub] Client {client.user_id} connected") - # Track online user - from app.router.v2.stats import add_online_user - - asyncio.create_task(add_online_user(client.user_id)) + # Use centralized online status management + from app.service.online_status_manager import online_status_manager + await online_status_manager.set_user_online(client.user_id, "spectator") # Send all current player states to the new client # This matches the official OnConnectedAsync behavior @@ -269,7 +267,7 @@ class SpectatorHub(Hub[StoreClientState]): # Critical addition: Notify about finished players in multiplayer games elif ( - room_user.state == MultiplayerUserState.RESULTS + hasattr(room_user.state, 'name') and room_user.state.name == 'RESULTS' and room_user.user_id not in self.state ): try: @@ -340,10 +338,15 @@ class SpectatorHub(Hub[StoreClientState]): ) logger.info(f"[SpectatorHub] {client.user_id} began playing {state.beatmap_id}") - # Track playing user + # Track playing user and maintain online status from app.router.v2.stats import add_playing_user + from app.service.online_status_manager import online_status_manager asyncio.create_task(add_playing_user(user_id)) + + # Critical fix: Maintain metadata online presence during gameplay + # This ensures the user appears online while playing + await online_status_manager.refresh_user_online_status(user_id, "playing") # # 预缓存beatmap文件以加速后续PP计算 # await self._preload_beatmap_for_pp_calculation(state.beatmap_id) @@ -357,21 +360,25 @@ class SpectatorHub(Hub[StoreClientState]): async def SendFrameData(self, client: Client, frame_data: FrameDataBundle) -> None: user_id = int(client.connection_id) - state = self.get_or_create_state(client) - if not state.score: + store = self.get_or_create_state(client) + if store.state is None or store.score is None: return - state.score.score_info.accuracy = frame_data.header.accuracy - state.score.score_info.combo = frame_data.header.combo - state.score.score_info.max_combo = frame_data.header.max_combo - state.score.score_info.statistics = frame_data.header.statistics - state.score.score_info.total_score = frame_data.header.total_score - state.score.score_info.mods = frame_data.header.mods - state.score.replay_frames.extend(frame_data.frames) + + # Critical fix: Refresh online status during active gameplay + # This prevents users from appearing offline while playing + from app.service.online_status_manager import online_status_manager + await online_status_manager.refresh_user_online_status(user_id, "playing_active") + + header = frame_data.header + score_info = store.score.score_info + score_info.accuracy = header.accuracy + score_info.combo = header.combo + score_info.max_combo = header.max_combo + score_info.statistics = header.statistics + store.score.replay_frames.extend(frame_data.frames) + await self.broadcast_group_call( - self.group_id(user_id), - "UserSentFrames", - user_id, - frame_data, + self.group_id(user_id), "UserSentFrames", user_id, frame_data ) async def EndPlaySession(self, client: Client, state: SpectatorState) -> None: diff --git a/main.py b/main.py index 9339cff..2be5c37 100644 --- a/main.py +++ b/main.py @@ -36,6 +36,7 @@ from app.service.osu_rx_statistics import create_rx_statistics from app.service.recalculate import recalculate from app.service.redis_message_system import redis_message_system from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler +from app.service.online_status_maintenance import schedule_online_status_maintenance # 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini") @@ -85,6 +86,7 @@ async def lifespan(app: FastAPI): await start_database_cleanup_scheduler() # 启动数据库清理调度器 redis_message_system.start() # 启动 Redis 消息系统 start_stats_scheduler() # 启动统计调度器 + schedule_online_status_maintenance() # 启动在线状态维护任务 load_achievements() # on shutdown yield