diff --git a/app/router/notification/message.py b/app/router/notification/message.py index 00354e1..187203e 100644 --- a/app/router/notification/message.py +++ b/app/router/notification/message.py @@ -92,6 +92,19 @@ async def send_message( channel_name = db_channel.name user_id = current_user.id + # 对于多人游戏房间,在发送消息前进行Redis键检查 + if channel_type == ChannelType.MULTIPLAYER: + try: + from app.dependencies.database import get_redis + redis = get_redis() + key = f"channel:{channel_id}:messages" + key_type = await redis.type(key) + if key_type not in ["none", "zset"]: + logger.warning(f"Fixing Redis key {key} with wrong type: {key_type}") + await redis.delete(key) + except Exception as e: + logger.warning(f"Failed to check/fix Redis key for channel {channel_id}: {e}") + # 使用 Redis 消息系统发送消息 - 立即返回 resp = await redis_message_system.send_message( channel_id=channel_id, diff --git a/app/router/notification/server.py b/app/router/notification/server.py index d4810a0..9259c46 100644 --- a/app/router/notification/server.py +++ b/app/router/notification/server.py @@ -76,6 +76,20 @@ class ChatServer: async def broadcast(self, channel_id: int, event: ChatEvent): users_in_channel = self.channels.get(channel_id, []) logger.info(f"Broadcasting to channel {channel_id}, users: {users_in_channel}") + + # 如果频道中没有用户,检查是否是多人游戏频道 + if not users_in_channel: + try: + async with with_db() as session: + from sqlmodel import select + channel = await session.get(ChatChannel, channel_id) + if channel and channel.type == ChannelType.MULTIPLAYER: + logger.warning(f"No users in multiplayer channel {channel_id}, message will not be delivered to anyone") + # 对于多人游戏房间,这可能是正常的(用户都离开了房间) + # 但我们仍然记录这个情况以便调试 + except Exception as e: + logger.error(f"Failed to check channel type for {channel_id}: {e}") + for user_id in users_in_channel: await self.send_event(user_id, event) logger.debug(f"Sent event to user {user_id} in channel {channel_id}") @@ -199,12 +213,15 @@ class ChatServer: # 使用明确的查询避免延迟加载 db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == channel_id))).first() if db_channel is None: + logger.warning(f"Attempted to join non-existent channel {channel_id} by user {user_id}") return user = await session.get(User, user_id) if user is None: + logger.warning(f"Attempted to join channel {channel_id} by non-existent user {user_id}") return + logger.info(f"User {user_id} joining channel {channel_id} (type: {db_channel.type.value})") await self.join_channel(user, db_channel, session) async def leave_room_channel(self, channel_id: int, user_id: int): @@ -212,12 +229,15 @@ class ChatServer: # 使用明确的查询避免延迟加载 db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == channel_id))).first() if db_channel is None: + logger.warning(f"Attempted to leave non-existent channel {channel_id} by user {user_id}") return user = await session.get(User, user_id) if user is None: + logger.warning(f"Attempted to leave channel {channel_id} by non-existent user {user_id}") return + logger.info(f"User {user_id} leaving channel {channel_id} (type: {db_channel.type.value})") await self.leave_channel(user, db_channel, session) async def new_private_notification(self, detail: NotificationDetail): diff --git a/app/service/redis_message_system.py b/app/service/redis_message_system.py index 8d3a033..842f399 100644 --- a/app/service/redis_message_system.py +++ b/app/service/redis_message_system.py @@ -244,23 +244,52 @@ class RedisMessageSystem: # 清理可能存在的错误类型键,然后添加到频道消息列表(按时间排序) channel_messages_key = f"channel:{channel_id}:messages" - # 检查键的类型,如果不是 zset 类型则删除 + # 更健壮的键类型检查和清理 try: key_type = await self._redis_exec(self.redis.type, channel_messages_key) - if key_type and key_type != "zset": + if key_type == "none": + # 键不存在,这是正常的 + pass + elif key_type != "zset": + # 键类型错误,需要清理 logger.warning(f"Deleting Redis key {channel_messages_key} with wrong type: {key_type}") await self._redis_exec(self.redis.delete, channel_messages_key) + + # 验证删除是否成功 + verify_type = await self._redis_exec(self.redis.type, channel_messages_key) + if verify_type != "none": + logger.error(f"Failed to delete problematic key {channel_messages_key}, type is still {verify_type}") + # 强制删除 + await self._redis_exec(self.redis.unlink, channel_messages_key) + except Exception as type_check_error: logger.warning(f"Failed to check key type for {channel_messages_key}: {type_check_error}") - # 如果检查失败,直接删除键以确保清理 - await self._redis_exec(self.redis.delete, channel_messages_key) + # 如果检查失败,尝试强制删除键以确保清理 + try: + await self._redis_exec(self.redis.delete, channel_messages_key) + except Exception: + # 最后的努力:使用unlink + try: + await self._redis_exec(self.redis.unlink, channel_messages_key) + except Exception as final_error: + logger.error(f"Critical: Unable to clear problematic key {channel_messages_key}: {final_error}") # 添加到频道消息列表(sorted set) - await self._redis_exec( - self.redis.zadd, - channel_messages_key, - {f"msg:{channel_id}:{message_id}": message_id}, - ) + try: + await self._redis_exec( + self.redis.zadd, + channel_messages_key, + {f"msg:{channel_id}:{message_id}": message_id}, + ) + except Exception as zadd_error: + logger.error(f"Failed to add message to sorted set {channel_messages_key}: {zadd_error}") + # 如果添加失败,再次尝试清理并重试 + await self._redis_exec(self.redis.delete, channel_messages_key) + await self._redis_exec( + self.redis.zadd, + channel_messages_key, + {f"msg:{channel_id}:{message_id}": message_id}, + ) # 保持频道消息列表大小(最多1000条) await self._redis_exec(self.redis.zremrangebyrank, channel_messages_key, 0, -1001) @@ -516,6 +545,8 @@ class RedisMessageSystem: self._batch_timer = asyncio.create_task(self._batch_persist_to_database()) # 启动时初始化消息ID计数器 bg_tasks.add_task(self._initialize_message_counter) + # 启动定期清理任务 + bg_tasks.add_task(self._periodic_cleanup) logger.info("Redis message system started") async def _initialize_message_counter(self): @@ -553,25 +584,67 @@ class RedisMessageSystem: keys_pattern = "channel:*:messages" keys = await self._redis_exec(self.redis.keys, keys_pattern) + fixed_count = 0 for key in keys: if isinstance(key, bytes): key = key.decode("utf-8") try: key_type = await self._redis_exec(self.redis.type, key) - if key_type and key_type != "zset": + if key_type == "none": + # 键不存在,正常情况 + continue + elif key_type != "zset": logger.warning(f"Cleaning up Redis key {key} with wrong type: {key_type}") await self._redis_exec(self.redis.delete, key) + + # 验证删除是否成功 + verify_type = await self._redis_exec(self.redis.type, key) + if verify_type != "none": + logger.error(f"Failed to delete problematic key {key}, trying unlink...") + await self._redis_exec(self.redis.unlink, key) + + fixed_count += 1 except Exception as cleanup_error: logger.warning(f"Failed to cleanup key {key}: {cleanup_error}") # 强制删除问题键 - await self._redis_exec(self.redis.delete, key) + try: + await self._redis_exec(self.redis.delete, key) + fixed_count += 1 + except Exception: + try: + await self._redis_exec(self.redis.unlink, key) + fixed_count += 1 + except Exception as final_error: + logger.error(f"Critical: Unable to clear problematic key {key}: {final_error}") - logger.info("Redis keys cleanup completed") + if fixed_count > 0: + logger.info(f"Redis keys cleanup completed, fixed {fixed_count} keys") + else: + logger.debug("Redis keys cleanup completed, no issues found") except Exception as e: logger.error(f"Failed to cleanup Redis keys: {e}") + async def _periodic_cleanup(self): + """定期清理任务""" + while self._running: + try: + # 每5分钟执行一次清理 + await asyncio.sleep(300) + if not self._running: + break + + logger.debug("Running periodic Redis keys cleanup...") + await self._cleanup_redis_keys() + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Periodic cleanup error: {e}") + # 出错后等待1分钟再重试 + await asyncio.sleep(60) + def stop(self): """停止系统""" if self._running: diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index a5eb244..ea23fb8 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -1018,6 +1018,18 @@ class MultiplayerHub(Hub[MultiplayerClientState]): played_user, ex=3600, ) + + # Ensure spectator hub is aware of all active players for the new game. + # This helps spectators receive score data for every participant, + # especially in subsequent rounds where state may get out of sync. + for room_user in room.room.users: + if (client := self.get_client_by_id(str(room_user.user_id))) is not None: + try: + await self._sync_with_spectator_hub(client, room) + except Exception as e: + logger.debug( + f"[MultiplayerHub] Failed to resync spectator hub for user {room_user.user_id}: {e}" + ) else: await room.queue.finish_current_item() diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index fa40f1e..e278adc 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -278,7 +278,19 @@ class SpectatorHub(Hub[StoreClientState]): user_id = int(client.connection_id) store = self.get_or_create_state(client) if store.state is not None: - return + logger.warning(f"[SpectatorHub] User {user_id} began new session without ending previous one; cleaning up") + try: + await self._end_session(user_id, store.state, store) + from app.router.private.stats import remove_playing_user + + bg_tasks.add_task(remove_playing_user, user_id) + finally: + store.state = None + store.beatmap_status = None + store.checksum = None + store.ruleset_id = None + store.score_token = None + store.score = None if state.beatmap_id is None or state.ruleset_id is None: return @@ -540,27 +552,33 @@ class SpectatorHub(Hub[StoreClientState]): try: # Get target user's current state if it exists target_store = self.state.get(target_id) - if target_store and target_store.state: - # CRITICAL FIX: Only send state if user is actually playing - # Don't send state for finished/quit games - if target_store.state.state == SpectatedUserState.Playing: - logger.debug(f"[SpectatorHub] {target_id} is currently playing, sending state") - # Send current state to the watcher immediately - await self.call_noblock( - client, - "UserBeganPlaying", - target_id, - target_store.state, - ) - else: - logger.debug( - f"[SpectatorHub] {target_id} state is {target_store.state.state}, not sending to watcher" - ) + if not target_store or not target_store.state: + logger.info(f"[SpectatorHub] Rejecting watch request for {target_id}: user not playing") + raise InvokeException("Target user is not currently playing") + + if target_store.state.state != SpectatedUserState.Playing: + logger.info( + f"[SpectatorHub] Rejecting watch request for {target_id}: state is {target_store.state.state}" + ) + raise InvokeException("Target user is not currently playing") + + logger.debug(f"[SpectatorHub] {target_id} is currently playing, sending state") + # Send current state to the watcher immediately + await self.call_noblock( + client, + "UserBeganPlaying", + target_id, + target_store.state, + ) + except InvokeException: + # Re-raise to inform caller without adding to group + raise except Exception as e: # User isn't tracked or error occurred - this is not critical logger.debug(f"[SpectatorHub] Could not get state for {target_id}: {e}") + raise InvokeException("Target user is not currently playing") from e - # Add watcher to our tracked users + # Add watcher to our tracked users only after validation store = self.get_or_create_state(client) store.watched_user.add(target_id)