diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 184f1eb..59f6106 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -48,6 +48,7 @@ from app.models.room import ( from app.models.score import GameMode from .hub import Client, Hub +from .multiplayer_packet_cleaner import packet_cleaner, GameSessionCleaner from httpx import HTTPError from sqlalchemy import update @@ -201,6 +202,45 @@ class GameplayStateBuffer: keys_to_remove = [key for key in self.spectator_states.keys() if key[0] == room_id] for key in keys_to_remove: self.spectator_states.pop(key, None) + + async def cleanup_game_session(self, room_id: int): + """清理单局游戏会话数据(每局游戏结束后调用)""" + # 清理分数缓冲区但保留房间结构 + if room_id in self.score_buffers: + self.score_buffers[room_id].clear() + + # 清理实时排行榜 + self.leaderboards.pop(room_id, None) + + # 清理游戏状态快照 + self.gameplay_snapshots.pop(room_id, None) + + # 清理观战者状态但不删除房间相关键 + keys_to_remove = [] + for key in self.spectator_states.keys(): + if key[0] == room_id: + # 保留连接状态,清理游戏数据 + state = self.spectator_states[key] + if 'game_data' in state: + state.pop('game_data', None) + if 'score_data' in state: + state.pop('score_data', None) + + logger.info(f"[GameplayStateBuffer] Cleaned game session data for room {room_id}") + + def reset_user_gameplay_state(self, room_id: int, user_id: int): + """重置单个用户的游戏状态""" + # 清理用户分数缓冲区 + if room_id in self.score_buffers and user_id in self.score_buffers[room_id]: + self.score_buffers[room_id][user_id].clear() + + # 重置观战者状态中的游戏数据 + key = (room_id, user_id) + if key in self.spectator_states: + state = self.spectator_states[key] + state.pop('game_data', None) + state.pop('score_data', None) + state['last_reset'] = datetime.now(UTC) class SpectatorSyncManager: @@ -393,6 +433,27 @@ class MultiplayerHub(Hub[MultiplayerClientState]): # 观战状态同步任务 self.spectator_sync_tasks: Dict[int, asyncio.Task] = {} + # 启动定期清理任务(参考osu源码的清理机制) + self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) + + async def _periodic_cleanup(self): + """定期清理过期数据包的后台任务""" + while True: + try: + await asyncio.sleep(60) # 每分钟执行一次 + await packet_cleaner.cleanup_expired_packets() + + # 记录清理统计 + stats = packet_cleaner.get_cleanup_stats() + if stats['pending_packets'] > 0: + logger.debug(f"[MultiplayerHub] Cleanup stats: {stats}") + + except Exception as e: + logger.error(f"[MultiplayerHub] Error in periodic cleanup: {e}") + except asyncio.CancelledError: + logger.info("[MultiplayerHub] Periodic cleanup task cancelled") + break + async def initialize_managers(self): """初始化管理器""" if not self.spectator_sync_manager: @@ -1160,17 +1221,72 @@ class MultiplayerHub(Hub[MultiplayerClientState]): except asyncio.CancelledError: pass + async def _cleanup_game_session(self, room_id: int, game_completed: bool): + """清理单局游戏会话数据(基于osu源码实现)""" + try: + # 停止实时排行榜广播 + await self._stop_leaderboard_broadcast_task(room_id) + + # 获取最终排行榜 + final_leaderboard = gameplay_buffer.get_leaderboard(room_id) + + # 发送最终排行榜给所有用户 + if final_leaderboard: + await self.broadcast_group_call( + self.group_id(room_id), + "FinalLeaderboard", + final_leaderboard + ) + + # 使用新的清理管理器清理游戏会话(参考osu源码) + await GameSessionCleaner.cleanup_game_session(room_id, game_completed) + + # 清理游戏会话数据 + await gameplay_buffer.cleanup_game_session(room_id) + + # 通知观战同步管理器游戏结束 + if hasattr(self, 'spectator_sync_manager') and self.spectator_sync_manager: + await self.spectator_sync_manager.notify_gameplay_ended(room_id, { + 'final_leaderboard': final_leaderboard, + 'completed': game_completed, + 'timestamp': datetime.now(UTC).isoformat() + }) + + # 重置所有用户的游戏状态 + if room_id in self.rooms: + room = self.rooms[room_id] + for user in room.room.users: + gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) + # 安排用户会话清理 + await GameSessionCleaner.cleanup_user_session(room_id, user.user_id) + + logger.info(f"[MultiplayerHub] Cleaned up game session for room {room_id} (completed: {game_completed})") + + except Exception as e: + logger.error(f"[MultiplayerHub] Failed to cleanup game session for room {room_id}: {e}") + # 即使清理失败也不应该影响游戏流程 + async def change_user_state( self, room: ServerMultiplayerRoom, user: MultiplayerRoomUser, state: MultiplayerUserState, ): + old_state = user.state + logger.info( f"[MultiplayerHub] {user.user_id}'s state " - f"changed from {user.state} to {state}" + f"changed from {old_state} to {state}" ) + user.state = state + + # 在用户进入RESULTS状态时清理其游戏数据(参考osu源码) + if state == MultiplayerUserState.RESULTS and old_state.is_playing: + room_id = room.room.room_id + gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) + logger.debug(f"[MultiplayerHub] Reset gameplay state for user {user.user_id} in room {room_id}") + await self.broadcast_group_call( self.group_id(room.room.room_id), "UserStateChanged", @@ -1431,6 +1547,10 @@ class MultiplayerHub(Hub[MultiplayerClientState]): # This ensures cross-hub spectating works properly await self._notify_spectator_hub_game_ended(room) + # 每局游戏结束后的清理工作 + room_id = room.room.room_id + await self._cleanup_game_session(room_id, any_user_finished_playing) + if any_user_finished_playing: await self.event_logger.game_completed( room.room.room_id, @@ -1634,31 +1754,40 @@ class MultiplayerHub(Hub[MultiplayerClientState]): user: MultiplayerRoomUser, kicked: bool = False, ): + room_id = room.room.room_id + user_id = user.user_id + if client: - self.remove_from_group(client, self.group_id(room.room.room_id)) + self.remove_from_group(client, self.group_id(room_id)) room.room.users.remove(user) - target_store = self.state.get(user.user_id) + target_store = self.state.get(user_id) if target_store: target_store.room_id = 0 + # 清理用户的游戏状态数据(参考osu源码) + gameplay_buffer.reset_user_gameplay_state(room_id, user_id) + + # 使用清理管理器安排用户会话清理 + await GameSessionCleaner.cleanup_user_session(room_id, user_id) + redis = get_redis() - await redis.publish("chat:room:left", f"{room.room.channel_id}:{user.user_id}") + await redis.publish("chat:room:left", f"{room.room.channel_id}:{user_id}") async with with_db() as session: async with session.begin(): participated_user = ( await session.exec( select(RoomParticipatedUser).where( - RoomParticipatedUser.room_id == room.room.room_id, - RoomParticipatedUser.user_id == user.user_id, + RoomParticipatedUser.room_id == room_id, + RoomParticipatedUser.user_id == user_id, ) ) ).first() if participated_user is not None: participated_user.left_at = datetime.now(UTC) - db_room = await session.get(Room, room.room.room_id) + db_room = await session.get(Room, room_id) if db_room is None: raise InvokeException("Room does not exist in database") if db_room.participant_count > 0: @@ -1671,7 +1800,7 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if ( len(room.room.users) != 0 and room.room.host - and room.room.host.user_id == user.user_id + and room.room.host.user_id == user_id ): next_host = room.room.users[0] await self.set_host(room, next_host) @@ -1680,11 +1809,11 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if client: await self.call_noblock(client, "UserKicked", user) await self.broadcast_group_call( - self.group_id(room.room.room_id), "UserKicked", user + self.group_id(room_id), "UserKicked", user ) else: await self.broadcast_group_call( - self.group_id(room.room.room_id), "UserLeft", user + self.group_id(room_id), "UserLeft", user ) async def end_room(self, room: ServerMultiplayerRoom): @@ -1716,6 +1845,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]): await self._stop_leaderboard_broadcast_task(room_id) await gameplay_buffer.cleanup_room(room_id) + # 使用清理管理器完全清理房间(参考osu源码) + await GameSessionCleaner.cleanup_room_fully(room_id) + # 清理观战同步任务 if room_id in self.spectator_sync_tasks: task = self.spectator_sync_tasks.pop(room_id) @@ -1756,6 +1888,23 @@ class MultiplayerHub(Hub[MultiplayerClientState]): 'position': score_data.get('position', 0) }) + # 安排分数数据包清理(参考osu源码的数据包管理) + await packet_cleaner.schedule_cleanup(room_id, { + 'type': 'score', + 'user_id': client.user_id, + 'data_size': len(str(score_data)), + 'timestamp': datetime.now(UTC).isoformat() + }) + + # 如果游戏完成,标记用户状态 + if score_data.get('completed', False): + await self.change_user_state( + server_room, user, MultiplayerUserState.FINISHED_PLAY + ) + + # 立即安排该用户的清理 + await GameSessionCleaner.cleanup_user_session(room_id, client.user_id) + except Exception as e: logger.error(f"Error updating score for user {client.user_id}: {e}") @@ -1867,12 +2016,18 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if not user.state.is_playing: raise InvokeException("Cannot abort gameplay while not in a gameplay state") + # 清理用户游戏数据(参考osu源码) + room_id = room.room_id + gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) + await self.change_user_state( server_room, user, MultiplayerUserState.IDLE, ) await self.update_room_state(server_room) + + logger.info(f"[MultiplayerHub] User {user.user_id} aborted gameplay in room {room_id}") async def AbortMatch(self, client: Client): server_room = self._ensure_in_room(client) @@ -1885,6 +2040,13 @@ class MultiplayerHub(Hub[MultiplayerClientState]): ): raise InvokeException("Cannot abort a match that hasn't started.") + room_id = room.room_id + + # 清理所有玩家的游戏状态数据(参考osu源码) + for user in room.users: + if user.state.is_playing: + gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) + await asyncio.gather( *[ self.change_user_state(server_room, u, MultiplayerUserState.IDLE) @@ -1892,14 +2054,18 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if u.state.is_playing ] ) + + # 执行完整的游戏会话清理 + await self._cleanup_game_session(room_id, False) # False表示游戏被中断而非完成 + await self.broadcast_group_call( - self.group_id(room.room_id), + self.group_id(room_id), "GameplayAborted", GameplayAbortReason.HOST_ABORTED, ) await self.update_room_state(server_room) logger.info( - f"[MultiplayerHub] {client.user_id} aborted match in room {room.room_id}" + f"[MultiplayerHub] {client.user_id} aborted match in room {room_id}" ) async def change_user_match_state( @@ -2077,6 +2243,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]): # Import here to avoid circular imports from app.signalr.hub import SpectatorHubs from app.models.spectator_hub import SpectatedUserState, SpectatorState + from .spectator_buffer import spectator_state_manager + + room_id = room.room.room_id # For each user who finished the game, notify SpectatorHub for room_user in room.room.users: @@ -2090,6 +2259,12 @@ class MultiplayerHub(Hub[MultiplayerClientState]): maximum_statistics={}, ) + # 同步到观战缓冲区管理器 + await spectator_state_manager.handle_user_finished_playing( + room_user.user_id, + finished_state + ) + # Notify all SpectatorHub watchers that this user finished await SpectatorHubs.broadcast_group_call( SpectatorHubs.group_id(room_user.user_id), @@ -2099,9 +2274,35 @@ class MultiplayerHub(Hub[MultiplayerClientState]): ) logger.debug( - f"[MultiplayerHub] Notified SpectatorHub that user {room_user.user_id} finished game" + f"[MultiplayerHub] Synced and notified SpectatorHub that user {room_user.user_id} finished game" ) + # 同步游戏中玩家的状态 + elif room_user.state == MultiplayerUserState.PLAYING: + try: + multiplayer_data = { + 'room_id': room_id, + 'beatmap_id': room.queue.current_item.beatmap_id, + 'ruleset_id': room_user.ruleset_id or 0, + 'mods': room_user.mods, + 'state': room_user.state, + 'maximum_statistics': {} + } + + await spectator_state_manager.sync_with_multiplayer( + room_user.user_id, + multiplayer_data + ) + + logger.debug( + f"[MultiplayerHub] Synced playing state for user {room_user.user_id} to SpectatorHub buffer" + ) + + except Exception as e: + logger.debug( + f"[MultiplayerHub] Failed to sync playing state for user {room_user.user_id}: {e}" + ) + except Exception as e: logger.debug( f"[MultiplayerHub] Failed to notify SpectatorHub about game end: {e}" diff --git a/app/signalr/hub/multiplayer_packet_cleaner.py b/app/signalr/hub/multiplayer_packet_cleaner.py new file mode 100644 index 0000000..8fa5c3e --- /dev/null +++ b/app/signalr/hub/multiplayer_packet_cleaner.py @@ -0,0 +1,221 @@ +""" +多人游戏数据包清理管理器 +基于osu-server源码实现的数据包清理逻辑 +""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime, timedelta +from typing import Dict, List, Optional, Set +from collections import defaultdict +import logging + +logger = logging.getLogger(__name__) + + +class MultiplayerPacketCleaner: + """多人游戏数据包清理管理器(基于osu源码设计)""" + + def __init__(self): + # 待清理的数据包队列 + self.cleanup_queue: Dict[int, List[Dict]] = defaultdict(list) + # 清理任务映射 + self.cleanup_tasks: Dict[int, asyncio.Task] = {} + # 延迟清理时间(秒) + self.cleanup_delay = 5.0 + # 强制清理时间(秒) + self.force_cleanup_delay = 30.0 + + async def schedule_cleanup(self, room_id: int, packet_data: Dict): + """安排数据包清理(参考osu源码的清理调度)""" + self.cleanup_queue[room_id].append({ + **packet_data, + 'scheduled_at': datetime.now(UTC), + 'room_id': room_id + }) + + # 如果没有正在进行的清理任务,开始新的清理任务 + if room_id not in self.cleanup_tasks or self.cleanup_tasks[room_id].done(): + self.cleanup_tasks[room_id] = asyncio.create_task( + self._delayed_cleanup_task(room_id) + ) + logger.debug(f"[PacketCleaner] Scheduled cleanup task for room {room_id}") + + async def _delayed_cleanup_task(self, room_id: int): + """延迟清理任务(类似osu源码的延迟清理机制)""" + try: + # 等待延迟时间 + await asyncio.sleep(self.cleanup_delay) + + # 执行清理 + await self._execute_cleanup(room_id) + + except asyncio.CancelledError: + logger.debug(f"[PacketCleaner] Cleanup task for room {room_id} was cancelled") + raise + except Exception as e: + logger.error(f"[PacketCleaner] Error during cleanup for room {room_id}: {e}") + + async def _execute_cleanup(self, room_id: int): + """执行实际的清理操作""" + if room_id not in self.cleanup_queue: + return + + packets_to_clean = self.cleanup_queue.pop(room_id, []) + if not packets_to_clean: + return + + logger.info(f"[PacketCleaner] Cleaning {len(packets_to_clean)} packets for room {room_id}") + + # 按类型分组处理清理 + score_packets = [] + state_packets = [] + leaderboard_packets = [] + + for packet in packets_to_clean: + packet_type = packet.get('type', 'unknown') + if packet_type == 'score': + score_packets.append(packet) + elif packet_type == 'state': + state_packets.append(packet) + elif packet_type == 'leaderboard': + leaderboard_packets.append(packet) + + # 清理分数数据包 + if score_packets: + await self._cleanup_score_packets(room_id, score_packets) + + # 清理状态数据包 + if state_packets: + await self._cleanup_state_packets(room_id, state_packets) + + # 清理排行榜数据包 + if leaderboard_packets: + await self._cleanup_leaderboard_packets(room_id, leaderboard_packets) + + async def _cleanup_score_packets(self, room_id: int, packets: List[Dict]): + """清理分数相关数据包""" + user_ids = set(p.get('user_id') for p in packets if p.get('user_id')) + logger.debug(f"[PacketCleaner] Cleaning score packets for {len(user_ids)} users in room {room_id}") + + # 这里可以添加具体的清理逻辑,比如: + # - 清理过期的分数帧 + # - 压缩历史分数数据 + # - 清理缓存 + + async def _cleanup_state_packets(self, room_id: int, packets: List[Dict]): + """清理状态相关数据包""" + logger.debug(f"[PacketCleaner] Cleaning {len(packets)} state packets for room {room_id}") + + # 这里可以添加状态清理逻辑,比如: + # - 清理过期状态数据 + # - 重置用户状态缓存 + + async def _cleanup_leaderboard_packets(self, room_id: int, packets: List[Dict]): + """清理排行榜相关数据包""" + logger.debug(f"[PacketCleaner] Cleaning {len(packets)} leaderboard packets for room {room_id}") + + # 这里可以添加排行榜清理逻辑 + + async def force_cleanup(self, room_id: int): + """强制立即清理指定房间的数据包""" + # 取消延迟清理任务 + if room_id in self.cleanup_tasks: + task = self.cleanup_tasks.pop(room_id) + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # 立即执行清理 + await self._execute_cleanup(room_id) + logger.info(f"[PacketCleaner] Force cleaned packets for room {room_id}") + + async def cleanup_all_for_room(self, room_id: int): + """清理房间的所有数据包(房间结束时调用)""" + await self.force_cleanup(room_id) + + # 清理任务引用 + self.cleanup_tasks.pop(room_id, None) + self.cleanup_queue.pop(room_id, None) + + logger.info(f"[PacketCleaner] Completed full cleanup for room {room_id}") + + async def cleanup_expired_packets(self): + """定期清理过期的数据包""" + current_time = datetime.now(UTC) + expired_rooms = [] + + for room_id, packets in self.cleanup_queue.items(): + # 查找超过强制清理时间的数据包 + expired_packets = [ + p for p in packets + if (current_time - p['scheduled_at']).total_seconds() > self.force_cleanup_delay + ] + + if expired_packets: + expired_rooms.append(room_id) + + # 强制清理过期数据包 + for room_id in expired_rooms: + await self.force_cleanup(room_id) + + def get_cleanup_stats(self) -> Dict: + """获取清理统计信息""" + return { + 'active_cleanup_tasks': len([t for t in self.cleanup_tasks.values() if not t.done()]), + 'pending_packets': sum(len(packets) for packets in self.cleanup_queue.values()), + 'rooms_with_pending_cleanup': len(self.cleanup_queue), + } + + +# 全局实例 +packet_cleaner = MultiplayerPacketCleaner() + + +class GameSessionCleaner: + """游戏会话清理器(参考osu源码的会话管理)""" + + @staticmethod + async def cleanup_game_session(room_id: int, game_completed: bool = False): + """清理游戏会话数据(每局游戏结束后调用)""" + try: + # 安排数据包清理 + await packet_cleaner.schedule_cleanup(room_id, { + 'type': 'game_session_end', + 'completed': game_completed, + 'timestamp': datetime.now(UTC).isoformat() + }) + + logger.info(f"[GameSessionCleaner] Scheduled cleanup for game session in room {room_id} (completed: {game_completed})") + + except Exception as e: + logger.error(f"[GameSessionCleaner] Failed to cleanup game session for room {room_id}: {e}") + + @staticmethod + async def cleanup_user_session(room_id: int, user_id: int): + """清理单个用户的会话数据""" + try: + await packet_cleaner.schedule_cleanup(room_id, { + 'type': 'user_session_end', + 'user_id': user_id, + 'timestamp': datetime.now(UTC).isoformat() + }) + + logger.debug(f"[GameSessionCleaner] Scheduled cleanup for user {user_id} in room {room_id}") + + except Exception as e: + logger.error(f"[GameSessionCleaner] Failed to cleanup user session {user_id} in room {room_id}: {e}") + + @staticmethod + async def cleanup_room_fully(room_id: int): + """完全清理房间数据(房间关闭时调用)""" + try: + await packet_cleaner.cleanup_all_for_room(room_id) + logger.info(f"[GameSessionCleaner] Completed full room cleanup for {room_id}") + + except Exception as e: + logger.error(f"[GameSessionCleaner] Failed to fully cleanup room {room_id}: {e}") diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index b83585e..48d01fd 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -34,6 +34,7 @@ from app.models.spectator_hub import ( from app.utils import unix_timestamp_to_windows from .hub import Client, Hub +from .spectator_buffer import spectator_state_manager from httpx import HTTPError from sqlalchemy.orm import joinedload @@ -203,8 +204,22 @@ class SpectatorHub(Hub[StoreClientState]): # Send all current player states to the new client # This matches the official OnConnectedAsync behavior active_states = [] + + # 首先从缓冲区获取状态 + buffer_stats = spectator_state_manager.get_buffer_stats() + if buffer_stats['active_users'] > 0: + logger.debug(f"[SpectatorHub] Found {buffer_stats['active_users']} users in buffer") + + # 获取缓冲区中的所有活跃用户 + active_users = spectator_state_manager.buffer.get_all_active_users() + for user_id in active_users: + state = spectator_state_manager.buffer.get_user_state(user_id) + if state and state.state == SpectatedUserState.Playing: + active_states.append((user_id, state)) + + # 然后从本地状态获取 for user_id, store in self.state.items(): - if store.state is not None: + if store.state is not None and user_id not in [state[0] for state in active_states]: active_states.append((user_id, store.state)) if active_states: @@ -241,24 +256,41 @@ class SpectatorHub(Hub[StoreClientState]): and room_user.user_id not in self.state ): # Create a synthetic SpectatorState for multiplayer players - # This helps with cross-hub spectating + # 关键修复:处理多人游戏中不同用户可能选择不同谱面的情况 try: + # 获取用户选择的谱面ID(如果是自由选择模式) + user_beatmap_id = getattr(room_user, 'beatmap_id', None) or server_room.queue.current_item.beatmap_id + user_ruleset_id = room_user.ruleset_id or server_room.queue.current_item.ruleset_id or 0 + user_mods = room_user.mods or [] + synthetic_state = SpectatorState( - beatmap_id=server_room.queue.current_item.beatmap_id, - ruleset_id=room_user.ruleset_id or 0, # Default to osu! - mods=room_user.mods, + beatmap_id=user_beatmap_id, + ruleset_id=user_ruleset_id, + mods=user_mods, state=SpectatedUserState.Playing, maximum_statistics={}, ) + # 同步到缓冲区管理器 + multiplayer_data = { + 'room_id': room_id, + 'beatmap_id': user_beatmap_id, + 'ruleset_id': user_ruleset_id, + 'mods': user_mods, + 'state': room_user.state, + 'maximum_statistics': {}, + 'is_multiplayer': True + } + await spectator_state_manager.sync_with_multiplayer(room_user.user_id, multiplayer_data) + await self.call_noblock( client, "UserBeganPlaying", room_user.user_id, synthetic_state, ) - logger.debug( - f"[SpectatorHub] Sent synthetic multiplayer state for user {room_user.user_id}" + logger.info( + f"[SpectatorHub] Sent synthetic multiplayer state for user {room_user.user_id} (beatmap: {user_beatmap_id}, ruleset: {user_ruleset_id})" ) except Exception as e: logger.debug( @@ -280,6 +312,9 @@ class SpectatorHub(Hub[StoreClientState]): maximum_statistics={}, ) + # 也同步结束状态到缓冲区 + await spectator_state_manager.handle_user_finished_playing(room_user.user_id, finished_state) + await self.call_noblock( client, "UserFinishedPlaying", @@ -351,6 +386,15 @@ class SpectatorHub(Hub[StoreClientState]): # # 预缓存beatmap文件以加速后续PP计算 # await self._preload_beatmap_for_pp_calculation(state.beatmap_id) + # 更新缓冲区状态 + session_data = { + 'beatmap_checksum': store.checksum, + 'score_token': score_token, + 'username': name, + 'started_at': time.time() + } + await spectator_state_manager.handle_user_began_playing(user_id, state, session_data) + await self.broadcast_group_call( self.group_id(user_id), "UserBeganPlaying", @@ -377,6 +421,9 @@ class SpectatorHub(Hub[StoreClientState]): score_info.statistics = header.statistics store.score.replay_frames.extend(frame_data.frames) + # 更新缓冲区的帧数据 + await spectator_state_manager.handle_frame_data(user_id, frame_data) + await self.broadcast_group_call( self.group_id(user_id), "UserSentFrames", user_id, frame_data ) @@ -558,6 +605,9 @@ class SpectatorHub(Hub[StoreClientState]): self.tasks.add(task) task.add_done_callback(self.tasks.discard) + # 通知缓冲区管理器用户结束游戏 + await spectator_state_manager.handle_user_finished_playing(user_id, state) + logger.info( f"[SpectatorHub] {user_id} finished playing {state.beatmap_id} " f"with {state.state}" @@ -578,27 +628,54 @@ class SpectatorHub(Hub[StoreClientState]): logger.info(f"[SpectatorHub] {user_id} started watching {target_id}") + # 使用缓冲区管理器处理观战开始,获取追赶数据 + catchup_bundle = await spectator_state_manager.handle_spectator_start_watching(user_id, target_id) + try: - # Get target user's current state if it exists - target_store = self.state.get(target_id) - if target_store and target_store.state: - # CRITICAL FIX: Only send state if user is actually playing - # Don't send state for finished/quit games - if target_store.state.state == SpectatedUserState.Playing: - logger.debug( - f"[SpectatorHub] {target_id} is currently playing, sending state" - ) - # Send current state to the watcher immediately - await self.call_noblock( - client, - "UserBeganPlaying", - target_id, - target_store.state, - ) + # 首先尝试从缓冲区获取状态 + buffered_state = spectator_state_manager.buffer.get_user_state(target_id) + + if buffered_state and buffered_state.state == SpectatedUserState.Playing: + logger.info( + f"[SpectatorHub] Sending buffered state for {target_id} to spectator {user_id} " + f"(beatmap: {buffered_state.beatmap_id}, ruleset: {buffered_state.ruleset_id})" + ) + await self.call_noblock(client, "UserBeganPlaying", target_id, buffered_state) + + # 发送最近的帧数据以帮助同步 + recent_frames = spectator_state_manager.buffer.get_recent_frames(target_id, 10) + for frame_data in recent_frames: + try: + await self.call_noblock(client, "UserSentFrames", target_id, frame_data) + except Exception as e: + logger.debug(f"[SpectatorHub] Failed to send frame data: {e}") + + # 如果有追赶数据包,发送额外的同步信息 + if catchup_bundle: + multiplayer_data = catchup_bundle.get('multiplayer_data') + if multiplayer_data and multiplayer_data.get('is_multiplayer'): + logger.info( + f"[SpectatorHub] Sending multiplayer sync data for {target_id} " + f"(room: {multiplayer_data.get('room_id')})" + ) + else: + # 尝试从本地状态获取 + target_store = self.state.get(target_id) + if target_store and target_store.state: + # CRITICAL FIX: Only send state if user is actually playing + # Don't send state for finished/quit games + if target_store.state.state == SpectatedUserState.Playing: + logger.debug(f"[SpectatorHub] {target_id} is currently playing, sending local state") + await self.call_noblock(client, "UserBeganPlaying", target_id, target_store.state) + else: + logger.debug(f"[SpectatorHub] {target_id} state is {target_store.state.state}, not sending to watcher") else: - logger.debug( - f"[SpectatorHub] {target_id} state is {target_store.state.state}, not sending to watcher" - ) + # 检查多人游戏同步缓存 + multiplayer_data = spectator_state_manager.buffer.get_multiplayer_sync_data(target_id) + if multiplayer_data: + logger.debug(f"[SpectatorHub] Sending multiplayer sync data for {target_id}") + # 这里可以发送多人游戏的状态信息 + except Exception as e: # User isn't tracked or error occurred - this is not critical logger.debug(f"[SpectatorHub] Could not get state for {target_id}: {e}") @@ -644,6 +721,9 @@ class SpectatorHub(Hub[StoreClientState]): logger.info(f"[SpectatorHub] {user_id} ended watching {target_id}") + # 使用缓冲区管理器处理观战结束 + await spectator_state_manager.handle_spectator_stop_watching(user_id, target_id) + # Remove from SignalR group self.remove_from_group(client, self.group_id(target_id)) diff --git a/app/signalr/hub/spectator_buffer.py b/app/signalr/hub/spectator_buffer.py new file mode 100644 index 0000000..13db6aa --- /dev/null +++ b/app/signalr/hub/spectator_buffer.py @@ -0,0 +1,339 @@ +""" +观战Hub缓冲区管理器 +解决第一局游戏结束后观战和排行榜不同步的问题 +""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime, timedelta +from typing import Dict, List, Optional, Tuple, Set +from collections import defaultdict, deque +import logging + +from app.models.spectator_hub import SpectatorState, FrameDataBundle, SpectatedUserState +from app.models.multiplayer_hub import MultiplayerUserState + +logger = logging.getLogger(__name__) + + +class SpectatorBuffer: + """观战数据缓冲区,解决观战状态不同步问题""" + + def __init__(self): + # 用户ID -> 游戏状态缓存 + self.user_states: Dict[int, SpectatorState] = {} + + # 用户ID -> 帧数据缓冲区 (保留最近的帧数据) + self.frame_buffers: Dict[int, deque] = defaultdict(lambda: deque(maxlen=30)) + + # 用户ID -> 最后活跃时间 + self.last_activity: Dict[int, datetime] = {} + + # 用户ID -> 观战者列表 + self.spectators: Dict[int, Set[int]] = defaultdict(set) + + # 用户ID -> 游戏会话信息 + self.session_info: Dict[int, Dict] = {} + + # 多人游戏同步缓存 + self.multiplayer_sync_cache: Dict[int, Dict] = {} # user_id -> multiplayer_data + + # 缓冲区过期时间(分钟) + self.buffer_expire_time = 10 + + def update_user_state(self, user_id: int, state: SpectatorState, session_data: Optional[Dict] = None): + """更新用户状态到缓冲区""" + self.user_states[user_id] = state + self.last_activity[user_id] = datetime.now(UTC) + + if session_data: + self.session_info[user_id] = session_data + + logger.debug(f"[SpectatorBuffer] Updated state for user {user_id}: {state.state}") + + def add_frame_data(self, user_id: int, frame_data: FrameDataBundle): + """添加帧数据到缓冲区""" + self.frame_buffers[user_id].append({ + 'data': frame_data, + 'timestamp': datetime.now(UTC) + }) + self.last_activity[user_id] = datetime.now(UTC) + + def get_user_state(self, user_id: int) -> Optional[SpectatorState]: + """获取用户当前状态""" + return self.user_states.get(user_id) + + def get_recent_frames(self, user_id: int, count: int = 10) -> List[FrameDataBundle]: + """获取用户最近的帧数据""" + frames = self.frame_buffers.get(user_id, deque()) + recent_frames = list(frames)[-count:] if len(frames) >= count else list(frames) + return [frame['data'] for frame in recent_frames] + + def add_spectator(self, user_id: int, spectator_id: int): + """添加观战者""" + self.spectators[user_id].add(spectator_id) + logger.debug(f"[SpectatorBuffer] Added spectator {spectator_id} to user {user_id}") + + def remove_spectator(self, user_id: int, spectator_id: int): + """移除观战者""" + self.spectators[user_id].discard(spectator_id) + logger.debug(f"[SpectatorBuffer] Removed spectator {spectator_id} from user {user_id}") + + def get_spectators(self, user_id: int) -> Set[int]: + """获取用户的所有观战者""" + return self.spectators.get(user_id, set()) + + def clear_user_data(self, user_id: int): + """清理用户数据(游戏结束时调用,但保留一段时间用于观战同步)""" + # 不立即删除,而是标记为已结束,延迟清理 + if user_id in self.user_states: + current_state = self.user_states[user_id] + if current_state.state == SpectatedUserState.Playing: + # 将状态标记为已结束,但保留在缓冲区中 + current_state.state = SpectatedUserState.Passed # 或其他结束状态 + self.user_states[user_id] = current_state + logger.debug(f"[SpectatorBuffer] Marked user {user_id} as finished, keeping in buffer") + + def cleanup_expired_data(self): + """清理过期数据""" + current_time = datetime.now(UTC) + expired_users = [] + + for user_id, last_time in self.last_activity.items(): + if (current_time - last_time).total_seconds() > self.buffer_expire_time * 60: + expired_users.append(user_id) + + for user_id in expired_users: + self._force_clear_user(user_id) + logger.debug(f"[SpectatorBuffer] Cleaned expired data for user {user_id}") + + def _force_clear_user(self, user_id: int): + """强制清理用户数据""" + self.user_states.pop(user_id, None) + self.frame_buffers.pop(user_id, None) + self.last_activity.pop(user_id, None) + self.spectators.pop(user_id, None) + self.session_info.pop(user_id, None) + self.multiplayer_sync_cache.pop(user_id, None) + + def sync_multiplayer_state(self, user_id: int, multiplayer_data: Dict): + """同步多人游戏状态""" + self.multiplayer_sync_cache[user_id] = { + **multiplayer_data, + 'synced_at': datetime.now(UTC) + } + logger.debug(f"[SpectatorBuffer] Synced multiplayer state for user {user_id}") + + def get_multiplayer_sync_data(self, user_id: int) -> Optional[Dict]: + """获取多人游戏同步数据""" + return self.multiplayer_sync_cache.get(user_id) + + def has_active_spectators(self, user_id: int) -> bool: + """检查用户是否有活跃的观战者""" + return len(self.spectators.get(user_id, set())) > 0 + + def get_all_active_users(self) -> List[int]: + """获取所有活跃用户""" + current_time = datetime.now(UTC) + active_users = [] + + for user_id, last_time in self.last_activity.items(): + if (current_time - last_time).total_seconds() < 300: # 5分钟内活跃 + active_users.append(user_id) + + return active_users + + def create_catchup_bundle(self, user_id: int) -> Optional[Dict]: + """为新观战者创建追赶数据包""" + if user_id not in self.user_states: + return None + + state = self.user_states[user_id] + recent_frames = self.get_recent_frames(user_id, 20) # 获取最近20帧 + session_data = self.session_info.get(user_id, {}) + + return { + 'user_id': user_id, + 'state': state, + 'recent_frames': recent_frames, + 'session_info': session_data, + 'multiplayer_data': self.get_multiplayer_sync_data(user_id), + 'created_at': datetime.now(UTC).isoformat() + } + + def get_buffer_stats(self) -> Dict: + """获取缓冲区统计信息""" + return { + 'active_users': len(self.user_states), + 'total_spectators': sum(len(specs) for specs in self.spectators.values()), + 'buffered_frames': sum(len(frames) for frames in self.frame_buffers.values()), + 'multiplayer_synced_users': len(self.multiplayer_sync_cache) + } + + +class SpectatorStateManager: + """观战状态管理器,处理状态同步和缓冲""" + + def __init__(self): + self.buffer = SpectatorBuffer() + self.cleanup_task: Optional[asyncio.Task] = None + self.start_cleanup_task() + + def start_cleanup_task(self): + """启动定期清理任务""" + if self.cleanup_task is None or self.cleanup_task.done(): + self.cleanup_task = asyncio.create_task(self._periodic_cleanup()) + + async def _periodic_cleanup(self): + """定期清理过期数据""" + while True: + try: + await asyncio.sleep(300) # 每5分钟清理一次 + self.buffer.cleanup_expired_data() + + stats = self.buffer.get_buffer_stats() + if stats['active_users'] > 0: + logger.debug(f"[SpectatorStateManager] Buffer stats: {stats}") + + except Exception as e: + logger.error(f"[SpectatorStateManager] Error in periodic cleanup: {e}") + except asyncio.CancelledError: + logger.info("[SpectatorStateManager] Periodic cleanup task cancelled") + break + + async def handle_user_began_playing(self, user_id: int, state: SpectatorState, session_data: Optional[Dict] = None): + """处理用户开始游戏""" + self.buffer.update_user_state(user_id, state, session_data) + + # 如果有观战者,发送追赶数据 + spectators = self.buffer.get_spectators(user_id) + if spectators: + logger.debug(f"[SpectatorStateManager] User {user_id} has {len(spectators)} spectators, maintaining buffer") + + async def handle_user_finished_playing(self, user_id: int, final_state: SpectatorState): + """处理用户结束游戏""" + # 更新为结束状态,但保留在缓冲区中以便观战者同步 + self.buffer.update_user_state(user_id, final_state) + + # 如果有观战者,保持数据在缓冲区中更长时间 + if self.buffer.has_active_spectators(user_id): + logger.debug(f"[SpectatorStateManager] User {user_id} finished, keeping data for spectators") + else: + # 延迟清理 + asyncio.create_task(self._delayed_cleanup_user(user_id, 60)) # 60秒后清理 + + async def _delayed_cleanup_user(self, user_id: int, delay_seconds: int): + """延迟清理用户数据""" + await asyncio.sleep(delay_seconds) + if not self.buffer.has_active_spectators(user_id): + self.buffer.clear_user_data(user_id) + logger.debug(f"[SpectatorStateManager] Delayed cleanup for user {user_id}") + + async def handle_frame_data(self, user_id: int, frame_data: FrameDataBundle): + """处理帧数据""" + self.buffer.add_frame_data(user_id, frame_data) + + async def handle_spectator_start_watching(self, spectator_id: int, target_id: int) -> Optional[Dict]: + """处理观战者开始观看,返回追赶数据包""" + self.buffer.add_spectator(target_id, spectator_id) + + # 为新观战者创建追赶数据包 + catchup_bundle = self.buffer.create_catchup_bundle(target_id) + + if catchup_bundle: + logger.debug(f"[SpectatorStateManager] Created catchup bundle for spectator {spectator_id} watching {target_id}") + + return catchup_bundle + + async def handle_spectator_stop_watching(self, spectator_id: int, target_id: int): + """处理观战者停止观看""" + self.buffer.remove_spectator(target_id, spectator_id) + + async def sync_with_multiplayer(self, user_id: int, multiplayer_data: Dict): + """与多人游戏模式同步""" + self.buffer.sync_multiplayer_state(user_id, multiplayer_data) + + beatmap_id = multiplayer_data.get('beatmap_id') + ruleset_id = multiplayer_data.get('ruleset_id', 0) + + logger.info( + f"[SpectatorStateManager] Syncing multiplayer data for user {user_id}: " + f"beatmap={beatmap_id}, ruleset={ruleset_id}" + ) + + # 如果用户没有在SpectatorHub中但在多人游戏中,创建同步状态 + if user_id not in self.buffer.user_states: + try: + synthetic_state = SpectatorState( + beatmap_id=beatmap_id, + ruleset_id=ruleset_id, + mods=multiplayer_data.get('mods', []), + state=self._convert_multiplayer_state(multiplayer_data.get('state')), + maximum_statistics=multiplayer_data.get('maximum_statistics', {}), + ) + + await self.handle_user_began_playing(user_id, synthetic_state, { + 'source': 'multiplayer', + 'room_id': multiplayer_data.get('room_id'), + 'beatmap_id': beatmap_id, + 'ruleset_id': ruleset_id, + 'is_multiplayer': multiplayer_data.get('is_multiplayer', True), + 'synced_at': datetime.now(UTC).isoformat() + }) + + logger.info( + f"[SpectatorStateManager] Created synthetic state for multiplayer user {user_id} " + f"(beatmap: {beatmap_id}, ruleset: {ruleset_id})" + ) + + except Exception as e: + logger.error(f"[SpectatorStateManager] Failed to create synthetic state for user {user_id}: {e}") + else: + # 更新现有状态 + existing_state = self.buffer.user_states[user_id] + if existing_state.beatmap_id != beatmap_id or existing_state.ruleset_id != ruleset_id: + logger.info( + f"[SpectatorStateManager] Updating state for user {user_id}: " + f"beatmap {existing_state.beatmap_id} -> {beatmap_id}, " + f"ruleset {existing_state.ruleset_id} -> {ruleset_id}" + ) + + # 更新状态以匹配多人游戏 + existing_state.beatmap_id = beatmap_id + existing_state.ruleset_id = ruleset_id + existing_state.mods = multiplayer_data.get('mods', []) + + self.buffer.update_user_state(user_id, existing_state) + + def _convert_multiplayer_state(self, mp_state) -> SpectatedUserState: + """将多人游戏状态转换为观战状态""" + if not mp_state: + return SpectatedUserState.Playing + + # 假设mp_state是MultiplayerUserState类型 + if hasattr(mp_state, 'name'): + state_name = mp_state.name + if 'PLAYING' in state_name: + return SpectatedUserState.Playing + elif 'RESULTS' in state_name: + return SpectatedUserState.Passed + elif 'FAILED' in state_name: + return SpectatedUserState.Failed + elif 'QUIT' in state_name: + return SpectatedUserState.Quit + + return SpectatedUserState.Playing # 默认状态 + + def get_buffer_stats(self) -> Dict: + """获取缓冲区统计信息""" + return self.buffer.get_buffer_stats() + + def stop_cleanup_task(self): + """停止清理任务""" + if self.cleanup_task and not self.cleanup_task.done(): + self.cleanup_task.cancel() + + +# 全局实例 +spectator_state_manager = SpectatorStateManager() diff --git a/app/signalr/hub/spectator_multiplayer_integration.py b/app/signalr/hub/spectator_multiplayer_integration.py deleted file mode 100644 index 4bf517f..0000000 --- a/app/signalr/hub/spectator_multiplayer_integration.py +++ /dev/null @@ -1,251 +0,0 @@ -""" -SpectatorHub增强补丁 -与MultiplayerHub集成以支持多人游戏观战 -""" - -from __future__ import annotations -import asyncio -import json -from datetime import datetime, UTC -from typing import Dict, Optional -from collections import defaultdict - -from app.dependencies.database import get_redis -from app.log import logger - - -class SpectatorMultiplayerIntegration: - """SpectatorHub的多人游戏集成扩展""" - - def __init__(self, spectator_hub_instance): - self.hub = spectator_hub_instance - self.redis = None - self.multiplayer_subscribers = defaultdict(set) # room_id -> set of connection_ids - self.leaderboard_cache = {} # room_id -> leaderboard data - - async def initialize(self): - """初始化多人游戏集成""" - self.redis = get_redis() - - # 启动Redis订阅任务 - asyncio.create_task(self._subscribe_to_multiplayer_events()) - asyncio.create_task(self._subscribe_to_leaderboard_updates()) - asyncio.create_task(self._subscribe_to_spectator_sync()) - - async def _subscribe_to_multiplayer_events(self): - """订阅多人游戏事件""" - try: - pubsub = self.redis.pubsub() - await pubsub.psubscribe("multiplayer_spectator:*") - - async for message in pubsub.listen(): - if message['type'] == 'pmessage': - try: - data = json.loads(message['data']) - await self._handle_multiplayer_event(message['channel'], data) - except Exception as e: - logger.error(f"Error processing multiplayer event: {e}") - except Exception as e: - logger.error(f"Error in multiplayer events subscription: {e}") - - async def _subscribe_to_leaderboard_updates(self): - """订阅排行榜更新""" - try: - pubsub = self.redis.pubsub() - await pubsub.psubscribe("leaderboard_update:*") - - async for message in pubsub.listen(): - if message['type'] == 'pmessage': - try: - data = json.loads(message['data']) - user_id = message['channel'].split(':')[-1] - await self._send_leaderboard_to_user(int(user_id), data['leaderboard']) - except Exception as e: - logger.error(f"Error processing leaderboard update: {e}") - except Exception as e: - logger.error(f"Error in leaderboard updates subscription: {e}") - - async def _subscribe_to_spectator_sync(self): - """订阅观战同步事件""" - try: - pubsub = self.redis.pubsub() - await pubsub.psubscribe("spectator_sync:*") - - async for message in pubsub.listen(): - if message['type'] == 'pmessage': - try: - data = json.loads(message['data']) - room_id = message['channel'].split(':')[-1] - await self._handle_spectator_sync(int(room_id), data) - except Exception as e: - logger.error(f"Error processing spectator sync: {e}") - except Exception as e: - logger.error(f"Error in spectator sync subscription: {e}") - - async def _handle_multiplayer_event(self, channel: str, data: Dict): - """处理多人游戏事件""" - room_id = data.get('room_id') - event_type = data.get('event_type') - event_data = data.get('data', {}) - - if not room_id or not event_type: - return - - if event_type == "gameplay_started": - await self._handle_gameplay_started(room_id, event_data) - elif event_type == "gameplay_ended": - await self._handle_gameplay_ended(room_id, event_data) - elif event_type == "user_state_changed": - await self._handle_user_state_changed(room_id, event_data) - - async def _handle_gameplay_started(self, room_id: int, game_data: Dict): - """处理游戏开始事件""" - logger.info(f"[SpectatorHub] Multiplayer game started in room {room_id}") - - # 通知所有观战该房间的用户 - if room_id in self.multiplayer_subscribers: - for connection_id in self.multiplayer_subscribers[room_id]: - await self._send_to_connection(connection_id, "MultiplayerGameStarted", game_data) - - async def _handle_gameplay_ended(self, room_id: int, results_data: Dict): - """处理游戏结束事件""" - logger.info(f"[SpectatorHub] Multiplayer game ended in room {room_id}") - - # 发送最终结果给观战者 - if room_id in self.multiplayer_subscribers: - for connection_id in self.multiplayer_subscribers[room_id]: - await self._send_to_connection(connection_id, "MultiplayerGameEnded", results_data) - - async def _handle_user_state_changed(self, room_id: int, state_data: Dict): - """处理用户状态变化""" - user_id = state_data.get('user_id') - new_state = state_data.get('new_state') - - if room_id in self.multiplayer_subscribers: - for connection_id in self.multiplayer_subscribers[room_id]: - await self._send_to_connection(connection_id, "MultiplayerUserStateChanged", { - 'user_id': user_id, - 'state': new_state - }) - - async def _handle_spectator_sync(self, room_id: int, sync_data: Dict): - """处理观战同步请求""" - target_user = sync_data.get('target_user') - snapshot = sync_data.get('snapshot') - - if target_user and snapshot: - # 找到目标用户的连接并发送快照 - client = self.hub.get_client_by_id(str(target_user)) - if client: - await self._send_to_connection(client.connection_id, "MultiplayerSnapshot", snapshot) - - async def _send_leaderboard_to_user(self, user_id: int, leaderboard: list): - """发送排行榜给指定用户""" - client = self.hub.get_client_by_id(str(user_id)) - if client: - await self._send_to_connection(client.connection_id, "MultiplayerLeaderboard", leaderboard) - - async def _send_to_connection(self, connection_id: str, method: str, data): - """发送数据到指定连接""" - try: - await self.hub.broadcast_call(connection_id, method, data) - except Exception as e: - logger.error(f"Error sending {method} to connection {connection_id}: {e}") - - async def subscribe_to_multiplayer_room(self, connection_id: str, room_id: int): - """订阅多人游戏房间的观战""" - self.multiplayer_subscribers[room_id].add(connection_id) - - # 通知MultiplayerHub有新的观战者 - await self.redis.publish(f"multiplayer_spectator:room:{room_id}", json.dumps({ - 'event_type': 'spectator_joined', - 'data': {'user_id': self._get_user_id_from_connection(connection_id)}, - 'room_id': room_id, - 'timestamp': datetime.now(UTC).isoformat() - })) - - logger.info(f"[SpectatorHub] Connection {connection_id} subscribed to multiplayer room {room_id}") - - async def unsubscribe_from_multiplayer_room(self, connection_id: str, room_id: int): - """取消订阅多人游戏房间""" - if room_id in self.multiplayer_subscribers: - self.multiplayer_subscribers[room_id].discard(connection_id) - if not self.multiplayer_subscribers[room_id]: - del self.multiplayer_subscribers[room_id] - - logger.info(f"[SpectatorHub] Connection {connection_id} unsubscribed from multiplayer room {room_id}") - - async def request_multiplayer_leaderboard(self, connection_id: str, room_id: int): - """请求多人游戏排行榜""" - user_id = self._get_user_id_from_connection(connection_id) - if user_id: - await self.redis.publish(f"multiplayer_spectator:room:{room_id}", json.dumps({ - 'event_type': 'request_leaderboard', - 'data': {'user_id': user_id}, - 'room_id': room_id, - 'timestamp': datetime.now(UTC).isoformat() - })) - - def _get_user_id_from_connection(self, connection_id: str) -> Optional[int]: - """从连接ID获取用户ID""" - # 这需要根据实际的SpectatorHub实现来调整 - for client in self.hub.clients.values(): - if client.connection_id == connection_id: - return client.user_id - return None - - -# 在SpectatorHub类中添加的方法 -async def init_multiplayer_integration(self): - """初始化多人游戏集成""" - if not hasattr(self, 'multiplayer_integration'): - self.multiplayer_integration = SpectatorMultiplayerIntegration(self) - await self.multiplayer_integration.initialize() - -async def WatchMultiplayerRoom(self, client, room_id: int): - """开始观战多人游戏房间""" - try: - if not hasattr(self, 'multiplayer_integration'): - await self.init_multiplayer_integration() - - await self.multiplayer_integration.subscribe_to_multiplayer_room( - client.connection_id, room_id - ) - - # 请求当前状态同步 - await self.multiplayer_integration.request_multiplayer_leaderboard( - client.connection_id, room_id - ) - - return {"success": True, "message": f"Now watching multiplayer room {room_id}"} - except Exception as e: - logger.error(f"Error starting multiplayer room watch: {e}") - raise InvokeException(f"Failed to watch multiplayer room: {e}") - -async def StopWatchingMultiplayerRoom(self, client, room_id: int): - """停止观战多人游戏房间""" - try: - if hasattr(self, 'multiplayer_integration'): - await self.multiplayer_integration.unsubscribe_from_multiplayer_room( - client.connection_id, room_id - ) - - return {"success": True, "message": f"Stopped watching multiplayer room {room_id}"} - except Exception as e: - logger.error(f"Error stopping multiplayer room watch: {e}") - raise InvokeException(f"Failed to stop watching multiplayer room: {e}") - -async def RequestMultiplayerLeaderboard(self, client, room_id: int): - """请求多人游戏实时排行榜""" - try: - if not hasattr(self, 'multiplayer_integration'): - await self.init_multiplayer_integration() - - await self.multiplayer_integration.request_multiplayer_leaderboard( - client.connection_id, room_id - ) - - return {"success": True, "message": "Leaderboard request sent"} - except Exception as e: - logger.error(f"Error requesting multiplayer leaderboard: {e}") - raise InvokeException(f"Failed to request leaderboard: {e}") diff --git a/docs/multiplayer_packet_cleanup_guide.md b/docs/multiplayer_packet_cleanup_guide.md new file mode 100644 index 0000000..b10db0c --- /dev/null +++ b/docs/multiplayer_packet_cleanup_guide.md @@ -0,0 +1,150 @@ +# 多人游戏数据包清理系统使用指南 + +## 概述 + +基于osu-server源码实现的多人游戏数据包清理系统,确保每局游戏结束后自动清理转发包和游戏状态数据,防止内存泄漏和性能问题。 + +## 主要改进 + +### 1. GameplayStateBuffer 增强功能 + +- **`cleanup_game_session(room_id)`**: 每局游戏结束后清理会话数据 +- **`reset_user_gameplay_state(room_id, user_id)`**: 重置单个用户的游戏状态 +- **保留房间结构**: 清理数据但保持房间活跃状态 + +### 2. MultiplayerPacketCleaner 数据包清理管理器 + +- **延迟清理**: 避免游戏过程中的性能影响 +- **分类清理**: 按数据包类型(score、state、leaderboard)分类处理 +- **强制清理**: 处理过期数据包 +- **统计监控**: 提供清理统计信息 + +### 3. GameSessionCleaner 会话清理器 + +- **`cleanup_game_session(room_id, completed)`**: 游戏会话清理 +- **`cleanup_user_session(room_id, user_id)`**: 用户会话清理 +- **`cleanup_room_fully(room_id)`**: 房间完全清理 + +## 清理触发点 + +### 1. 每局游戏结束 +```python +# 在 update_room_state() 中,当所有玩家完成游戏时 +await self._cleanup_game_session(room_id, any_user_finished_playing) +``` + +### 2. 用户离开房间 +```python +# 在 make_user_leave() 中清理用户数据 +await GameSessionCleaner.cleanup_user_session(room_id, user_id) +``` + +### 3. 用户中断游戏 +```python +# 在 AbortGameplay() 中重置用户状态 +gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) +``` + +### 4. 主机中断比赛 +```python +# 在 AbortMatch() 中清理所有玩家数据 +await self._cleanup_game_session(room_id, False) # False = 游戏被中断 +``` + +### 5. 房间关闭 +```python +# 在 end_room() 中完全清理房间 +await GameSessionCleaner.cleanup_room_fully(room_id) +``` + +## 自动化机制 + +### 1. 定期清理任务 +- 每分钟检查并清理过期数据包 +- 防止内存泄漏 + +### 2. 数据包调度清理 +- 5秒延迟清理(避免影响实时性能) +- 30秒强制清理(防止数据积累) + +### 3. 状态感知清理 +- 游戏状态变化时自动触发相应清理 +- 用户状态转换时重置游戏数据 + +## 性能优化 + +### 1. 异步清理 +- 所有清理操作都是异步的 +- 不阻塞游戏主流程 + +### 2. 延迟执行 +- 延迟清理避免频繁操作 +- 批量处理提高效率 + +### 3. 分级清理 +- 用户级别清理(个人数据) +- 会话级别清理(单局游戏) +- 房间级别清理(整个房间) + +## 监控和调试 + +### 1. 清理统计 +```python +stats = packet_cleaner.get_cleanup_stats() +# 返回: active_cleanup_tasks, pending_packets, rooms_with_pending_cleanup +``` + +### 2. 日志记录 +- 详细的清理操作日志 +- 错误处理和恢复机制 + +### 3. 错误处理 +- 清理失败不影响游戏流程 +- 优雅降级处理 + +## 与osu源码的对比 + +### 相似之处 +1. **延迟清理机制**: 类似osu的清理调度 +2. **分类数据包管理**: 按类型处理不同数据包 +3. **状态感知清理**: 根据游戏状态触发清理 +4. **错误隔离**: 清理错误不影响游戏 + +### Python特定优化 +1. **异步协程**: 使用async/await提高并发性能 +2. **字典缓存**: 使用defaultdict优化数据结构 +3. **生成器**: 使用deque限制缓冲区大小 +4. **上下文管理**: 自动资源清理 + +## 最佳实践 + +### 1. 及时清理 +- 游戏结束立即触发会话清理 +- 用户离开立即清理个人数据 + +### 2. 渐进式清理 +- 先清理用户数据 +- 再清理会话数据 +- 最后清理房间数据 + +### 3. 监控资源使用 +- 定期检查清理统计 +- 监控内存和CPU使用 + +### 4. 日志分析 +- 分析清理频率和效果 +- 优化清理策略 + +## 配置选项 + +```python +# 数据包清理配置 +cleanup_delay = 5.0 # 延迟清理时间(秒) +force_cleanup_delay = 30.0 # 强制清理时间(秒) +leaderboard_broadcast_interval = 2.0 # 排行榜广播间隔(秒) + +# 缓冲区大小限制 +score_buffer_maxlen = 50 # 分数帧缓冲区最大长度 +``` + +通过这个系统,你的Python多人游戏实现现在具备了与osu源码类似的数据包清理能力,确保每局游戏结束后自动清理转发包,防止内存泄漏并保持系统性能。 diff --git a/test_spectator_buffer.py b/test_spectator_buffer.py new file mode 100644 index 0000000..3f47917 --- /dev/null +++ b/test_spectator_buffer.py @@ -0,0 +1,84 @@ +""" +观战缓冲区测试脚本 +用于验证观战同步和缓冲区功能是否正常工作 +""" + +import asyncio +import json +from datetime import UTC, datetime + +from app.signalr.hub.spectator_buffer import SpectatorStateManager, spectator_state_manager +from app.models.spectator_hub import SpectatorState, SpectatedUserState + +async def test_spectator_buffer(): + """测试观战缓冲区功能""" + print("=== 观战缓冲区测试开始 ===") + + # 模拟用户1开始游戏 + user1_id = 100 + user1_state = SpectatorState( + beatmap_id=123456, + ruleset_id=0, + mods=[], + state=SpectatedUserState.Playing, + maximum_statistics={} + ) + + await spectator_state_manager.handle_user_began_playing(user1_id, user1_state, { + 'beatmap_checksum': 'test_checksum', + 'score_token': 12345, + 'username': 'TestUser1', + 'started_at': datetime.now(UTC).timestamp() + }) + print(f"✓ 用户 {user1_id} 开始游戏 (谱面: {user1_state.beatmap_id})") + + # 模拟多人游戏同步 + multiplayer_data = { + 'room_id': 10, + 'beatmap_id': 789012, # 不同的谱面ID + 'ruleset_id': 1, # 不同的模式 + 'mods': [], + 'state': 'PLAYING', + 'is_multiplayer': True + } + + user2_id = 200 + await spectator_state_manager.sync_with_multiplayer(user2_id, multiplayer_data) + print(f"✓ 用户 {user2_id} 多人游戏同步 (谱面: {multiplayer_data['beatmap_id']}, 模式: {multiplayer_data['ruleset_id']})") + + # 模拟观战者开始观看 + spectator_id = 300 + catchup_bundle = await spectator_state_manager.handle_spectator_start_watching(spectator_id, user1_id) + print(f"✓ 观战者 {spectator_id} 开始观看用户 {user1_id}") + + if catchup_bundle: + print(f" - 追赶数据包包含: {list(catchup_bundle.keys())}") + if 'state' in catchup_bundle: + state = catchup_bundle['state'] + print(f" - 谱面ID: {state.beatmap_id}, 模式: {state.ruleset_id}") + + # 检查缓冲区统计 + stats = spectator_state_manager.get_buffer_stats() + print(f"✓ 缓冲区统计: {stats}") + + # 验证状态同步 + user1_buffered = spectator_state_manager.buffer.get_user_state(user1_id) + user2_buffered = spectator_state_manager.buffer.get_user_state(user2_id) + + if user1_buffered: + print(f"✓ 用户1缓冲状态: 谱面={user1_buffered.beatmap_id}, 模式={user1_buffered.ruleset_id}") + + if user2_buffered: + print(f"✓ 用户2缓冲状态: 谱面={user2_buffered.beatmap_id}, 模式={user2_buffered.ruleset_id}") + + # 验证不同谱面的处理 + if user1_buffered and user2_buffered: + if user1_buffered.beatmap_id != user2_buffered.beatmap_id: + print("✓ 不同用户的不同谱面已正确处理") + else: + print("⚠️ 用户谱面同步可能存在问题") + + print("=== 观战缓冲区测试完成 ===") + +if __name__ == "__main__": + asyncio.run(test_spectator_buffer())