From 16f4ff1832183d7deea73c6cd2344005d3b41238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 02:50:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E4=BA=BA=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20=E8=B4=B4=E5=90=88=E5=AE=98=E6=96=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/signalr/hub/multiplayer.py | 189 +++++++++++++++++++++++++++++++-- app/signalr/hub/spectator.py | 137 ++++++++++++++++++------ 2 files changed, 286 insertions(+), 40 deletions(-) diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 37012a4..7a0fdd4 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -279,6 +279,11 @@ class MultiplayerHub(Hub[MultiplayerClientState]): room.users.append(user) self.add_to_group(client, self.group_id(room_id)) await server_room.match_type_handler.handle_join(user) + + # Critical fix: Send current room and gameplay state to new user + # This ensures spectators joining ongoing games get proper state sync + await self._send_room_state_to_new_user(client, server_room) + await self.event_logger.player_joined(room_id, user.user_id) async with with_db() as session: @@ -409,7 +414,7 @@ class MultiplayerHub(Hub[MultiplayerClientState]): auto_start_duration=int( room.room.settings.auto_start_duration.total_seconds() ), - host_id=room.room.host.user_id, + host_id=room.room.host.user_id if room.room.host else None, ) ) await session.commit() @@ -648,8 +653,21 @@ class MultiplayerHub(Hub[MultiplayerClientState]): case MultiplayerUserState.RESULTS: raise InvokeException(f"Cannot change state from {old} to {new}") case MultiplayerUserState.SPECTATING: - if old not in (MultiplayerUserState.IDLE, MultiplayerUserState.READY): - raise InvokeException(f"Cannot change state from {old} to {new}") + # Enhanced spectator validation - allow transitions from more states + # This matches official osu-server-spectator behavior + if old not in ( + MultiplayerUserState.IDLE, + MultiplayerUserState.READY, + MultiplayerUserState.RESULTS, # Allow spectating after results + ): + # Allow spectating during gameplay states only if the room is in appropriate state + if not (old.is_playing and room.room.state in ( + MultiplayerRoomState.WAITING_FOR_LOAD, + MultiplayerRoomState.PLAYING + )): + raise InvokeException(f"Cannot change state from {old} to {new}") + case _: + raise InvokeException(f"Invalid state transition from {old} to {new}") async def ChangeState(self, client: Client, state: MultiplayerUserState): server_room = self._ensure_in_room(client) @@ -660,6 +678,8 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if user.state == state: return + + # Special handling for state changes during gameplay match state: case MultiplayerUserState.IDLE: if user.state.is_playing: @@ -667,17 +687,22 @@ class MultiplayerHub(Hub[MultiplayerClientState]): case MultiplayerUserState.LOADED | MultiplayerUserState.READY_FOR_GAMEPLAY: if not user.state.is_playing: return + + logger.info( + f"[MultiplayerHub] User {user.user_id} changing state from {user.state} to {state}" + ) + await self.validate_user_stare( server_room, user.state, state, ) + await self.change_user_state(server_room, user, state) - if state == MultiplayerUserState.SPECTATING and ( - room.state == MultiplayerRoomState.PLAYING - or room.state == MultiplayerRoomState.WAITING_FOR_LOAD - ): - await self.call_noblock(client, "LoadRequested") + + # Enhanced spectator handling based on official implementation + if state == MultiplayerUserState.SPECTATING: + await self.handle_spectator_state_change(client, server_room, user) await self.update_room_state(server_room) @@ -699,6 +724,152 @@ class MultiplayerHub(Hub[MultiplayerClientState]): user.state, ) + async def handle_spectator_state_change( + self, + client: Client, + room: ServerMultiplayerRoom, + user: MultiplayerRoomUser + ): + """ + Handle special logic for users entering spectator mode during ongoing gameplay. + Based on official osu-server-spectator implementation. + """ + room_state = room.room.state + + # If switching to spectating during gameplay, immediately request load + if room_state == MultiplayerRoomState.WAITING_FOR_LOAD: + logger.info( + f"[MultiplayerHub] Spectator {user.user_id} joining during load phase" + ) + await self.call_noblock(client, "LoadRequested") + + elif room_state == MultiplayerRoomState.PLAYING: + logger.info( + f"[MultiplayerHub] Spectator {user.user_id} joining during active gameplay" + ) + await self.call_noblock(client, "LoadRequested") + + async def _send_current_gameplay_state_to_spectator( + self, + client: Client, + room: ServerMultiplayerRoom + ): + """ + Send current gameplay state information to a newly joined spectator. + This helps spectators sync with ongoing gameplay. + """ + try: + # Send current room state + await self.call_noblock( + client, + "RoomStateChanged", + room.room.state + ) + + # Send current user states for all players + for room_user in room.room.users: + if room_user.state.is_playing: + await self.call_noblock( + client, + "UserStateChanged", + room_user.user_id, + room_user.state, + ) + + logger.debug( + f"[MultiplayerHub] Sent current gameplay state to spectator {client.user_id}" + ) + except Exception as e: + logger.error( + f"[MultiplayerHub] Failed to send gameplay state to spectator {client.user_id}: {e}" + ) + + async def _send_room_state_to_new_user( + self, + client: Client, + room: ServerMultiplayerRoom + ): + """ + Send complete room state to a newly joined user. + Critical for spectators joining ongoing games. + """ + try: + # Send current room state + if room.room.state != MultiplayerRoomState.OPEN: + await self.call_noblock( + client, + "RoomStateChanged", + room.room.state + ) + + # If room is in gameplay state, send LoadRequested immediately + if room.room.state in ( + MultiplayerRoomState.WAITING_FOR_LOAD, + MultiplayerRoomState.PLAYING + ): + logger.info( + f"[MultiplayerHub] Sending LoadRequested to user {client.user_id} " + f"joining ongoing game (room state: {room.room.state})" + ) + await self.call_noblock(client, "LoadRequested") + + # Send all user states to help with synchronization + for room_user in room.room.users: + if room_user.user_id != client.user_id: # Don't send own state + await self.call_noblock( + client, + "UserStateChanged", + room_user.user_id, + room_user.state, + ) + + # Critical addition: Send current playing users to SpectatorHub for cross-hub sync + # This ensures spectators can watch multiplayer players properly + await self._sync_with_spectator_hub(client, room) + + logger.debug( + f"[MultiplayerHub] Sent complete room state to new user {client.user_id}" + ) + except Exception as e: + logger.error( + f"[MultiplayerHub] Failed to send room state to user {client.user_id}: {e}" + ) + + async def _sync_with_spectator_hub( + self, + client: Client, + room: ServerMultiplayerRoom + ): + """ + Sync with SpectatorHub to ensure cross-hub spectating works properly. + This is crucial for users watching multiplayer players from other pages. + """ + try: + # Import here to avoid circular imports + from app.signalr.hub import SpectatorHubs + + # For each playing user in the room, check if they have SpectatorHub state + # and notify the new client about their playing status + for room_user in room.room.users: + if room_user.state.is_playing: + spectator_state = SpectatorHubs.state.get(room_user.user_id) + if spectator_state and spectator_state.state: + # Send the spectator state to help with cross-hub watching + await self.call_noblock( + client, + "UserBeganPlaying", + room_user.user_id, + spectator_state.state, + ) + logger.debug( + f"[MultiplayerHub] Synced spectator state for user {room_user.user_id} " + f"to new client {client.user_id}" + ) + + except Exception as e: + logger.debug(f"[MultiplayerHub] Failed to sync with SpectatorHub: {e}") + # This is not critical, so we don't raise the exception + async def update_room_state(self, room: ServerMultiplayerRoom): match room.room.state: case MultiplayerRoomState.OPEN: @@ -786,7 +957,7 @@ class MultiplayerHub(Hub[MultiplayerClientState]): self._ensure_host(client, server_room) # Check host state - host must be ready or spectating - if room.host.state not in ( + if room.host and room.host.state not in ( MultiplayerUserState.SPECTATING, MultiplayerUserState.READY, ): diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index 7fd682e..a95b89e 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -165,22 +165,49 @@ class SpectatorHub(Hub[StoreClientState]): @override async def _clean_state(self, state: StoreClientState) -> None: + """ + Enhanced cleanup based on official osu-server-spectator implementation. + Properly notifies watched users when spectator disconnects. + """ if state.state: await self._end_session(int(state.connection_id), state.state, state) - for target in self.waited_clients: - target_client = self.get_client_by_id(target) - if target_client: + + # Critical fix: Notify all watched users that this spectator has disconnected + # This matches the official CleanUpState implementation + user_id = int(state.connection_id) + for watched_user_id in state.watched_user: + if (target_client := self.get_client_by_id(str(watched_user_id))) is not None: await self.call_noblock( - target_client, "UserEndedWatching", int(state.connection_id) + target_client, "UserEndedWatching", user_id + ) + logger.debug( + f"[SpectatorHub] Notified {watched_user_id} that {user_id} stopped watching" ) async def on_client_connect(self, client: Client) -> None: - tasks = [ - self.call_noblock(client, "UserBeganPlaying", user_id, store.state) + """ + Enhanced connection handling based on official implementation. + Send all active player states to newly connected clients. + """ + logger.info(f"[SpectatorHub] Client {client.user_id} connected") + + # Send all current player states to the new client + # This matches the official OnConnectedAsync behavior + active_states = [ + (user_id, store.state) for user_id, store in self.state.items() if store.state is not None ] - await asyncio.gather(*tasks) + + if active_states: + logger.debug( + f"[SpectatorHub] Sending {len(active_states)} active player states to {client.user_id}" + ) + tasks = [ + self.call_noblock(client, "UserBeganPlaying", user_id, state) + for user_id, state in active_states + ] + await asyncio.gather(*tasks, return_exceptions=True) async def BeginPlaySession( self, client: Client, score_token: int, state: SpectatorState @@ -382,20 +409,30 @@ class SpectatorHub(Hub[StoreClientState]): if state.state == SpectatedUserState.Playing: state.state = SpectatedUserState.Quit - exit_time = max(frame.time for frame in store.score.replay_frames) // 1000 # pyright: ignore[reportOptionalMemberAccess] + logger.debug(f"[SpectatorHub] Changed state from Playing to Quit for user {user_id}") + + # Calculate exit time safely + exit_time = 0 + if store.score and store.score.replay_frames: + exit_time = max(frame.time for frame in store.score.replay_frames) // 1000 - task = asyncio.create_task( - _edit_playtime( - store.score_token, # pyright: ignore[reportArgumentType] - store.ruleset_id, # pyright: ignore[reportArgumentType] - store.score.score_info.mods, # pyright: ignore[reportOptionalMemberAccess] + # Background task for playtime editing - only if we have valid data + if store.score_token and store.ruleset_id and store.score: + task = asyncio.create_task( + _edit_playtime( + store.score_token, + store.ruleset_id, + store.score.score_info.mods, + ) ) - ) - self.tasks.add(task) - task.add_done_callback(self.tasks.discard) + self.tasks.add(task) + task.add_done_callback(self.tasks.discard) + + # Background task for failtime tracking - only for failed/quit states with valid data if ( - state.state == SpectatedUserState.Failed - or state.state == SpectatedUserState.Quit + state.beatmap_id is not None + and exit_time > 0 + and state.state in (SpectatedUserState.Failed, SpectatedUserState.Quit) ): task = asyncio.create_task(_add_failtime()) self.tasks.add(task) @@ -413,39 +450,77 @@ class SpectatorHub(Hub[StoreClientState]): ) async def StartWatchingUser(self, client: Client, target_id: int) -> None: + """ + Enhanced StartWatchingUser based on official osu-server-spectator implementation. + Properly handles state synchronization and watcher notifications. + """ user_id = int(client.connection_id) - target_store = self.state.get(target_id) + logger.info(f"[SpectatorHub] {user_id} started watching {target_id}") - if target_store and target_store.state: - logger.debug(f"[SpectatorHub] {target_id} is {target_store.state}") - await self.call_noblock( - client, - "UserBeganPlaying", - target_id, - target_store.state, - ) + + try: + # Get target user's current state if it exists + target_store = self.state.get(target_id) + if target_store and target_store.state: + logger.debug(f"[SpectatorHub] {target_id} is currently {target_store.state.state}") + # Send current state to the watcher immediately + await self.call_noblock( + client, + "UserBeganPlaying", + target_id, + target_store.state, + ) + except Exception as e: + # User isn't tracked or error occurred - this is not critical + logger.debug(f"[SpectatorHub] Could not get state for {target_id}: {e}") + + # Add watcher to our tracked users store = self.get_or_create_state(client) store.watched_user.add(target_id) + # Add to SignalR group for this target user self.add_to_group(client, self.group_id(target_id)) - async with with_db() as session: - async with session.begin(): + # Get watcher's username and notify the target user + try: + async with with_db() as session: username = ( await session.exec(select(User.username).where(User.id == user_id)) ).first() if not username: + logger.warning(f"[SpectatorHub] Could not find username for user {user_id}") return + + # Notify target user that someone started watching if (target_client := self.get_client_by_id(str(target_id))) is not None: + # Create watcher info array (matches official format) + watcher_info = [[user_id, username]] await self.call_noblock( - target_client, "UserStartedWatching", [[user_id, username]] + target_client, "UserStartedWatching", watcher_info ) + logger.debug(f"[SpectatorHub] Notified {target_id} that {username} started watching") + except Exception as e: + logger.error(f"[SpectatorHub] Error notifying target user {target_id}: {e}") async def EndWatchingUser(self, client: Client, target_id: int) -> None: + """ + Enhanced EndWatchingUser based on official osu-server-spectator implementation. + Properly cleans up watcher state and notifies target user. + """ user_id = int(client.connection_id) + + logger.info(f"[SpectatorHub] {user_id} ended watching {target_id}") + + # Remove from SignalR group self.remove_from_group(client, self.group_id(target_id)) + + # Remove from our tracked watched users store = self.get_or_create_state(client) store.watched_user.discard(target_id) + + # Notify target user that watcher stopped watching if (target_client := self.get_client_by_id(str(target_id))) is not None: await self.call_noblock(target_client, "UserEndedWatching", user_id) - logger.info(f"[SpectatorHub] {user_id} ended watching {target_id}") + logger.debug(f"[SpectatorHub] Notified {target_id} that {user_id} stopped watching") + else: + logger.debug(f"[SpectatorHub] Target user {target_id} not found for end watching notification")