From e293d7541b5d1c2a416c138fc2a47621a561794e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 15:07:50 +0800 Subject: [PATCH] rollback code --- app/database/email_verification.py | 4 +- app/router/notification/message.py | 19 +- app/router/private/team.py | 22 +- app/service/email_verification_service.py | 13 +- app/service/online_status_maintenance.py | 19 +- app/service/redis_message_system.py | 4 +- app/signalr/hub/multiplayer.py | 804 +----------------- app/signalr/hub/multiplayer_packet_cleaner.py | 221 ----- app/signalr/hub/spectator.py | 132 +-- app/signalr/hub/spectator_buffer.py | 339 -------- app/utils.py | 57 -- docs/multiplayer_packet_cleanup_guide.md | 150 ---- 12 files changed, 74 insertions(+), 1710 deletions(-) delete mode 100644 app/signalr/hub/multiplayer_packet_cleaner.py delete mode 100644 app/signalr/hub/spectator_buffer.py delete mode 100644 docs/multiplayer_packet_cleanup_guide.md diff --git a/app/database/email_verification.py b/app/database/email_verification.py index 30ad6db..baf053b 100644 --- a/app/database/email_verification.py +++ b/app/database/email_verification.py @@ -23,7 +23,7 @@ class EmailVerification(SQLModel, table=True): is_used: bool = Field(default=False) # 是否已使用 used_at: datetime | None = Field(default=None) ip_address: str | None = Field(default=None) # 请求IP - user_agent: str | None = Field(default=None, max_length=255) # 用户代理 + user_agent: str | None = Field(default=None) # 用户代理 class LoginSession(SQLModel, table=True): @@ -35,7 +35,7 @@ class LoginSession(SQLModel, table=True): user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True)) session_token: str = Field(unique=True, index=True) # 会话令牌 ip_address: str = Field() # 登录IP - user_agent: str | None = Field(default=None, max_length=255) # 用户代理(限制长度为255字符) + user_agent: str | None = Field(default=None) country_code: str | None = Field(default=None) is_verified: bool = Field(default=False) # 是否已验证 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) diff --git a/app/router/notification/message.py b/app/router/notification/message.py index 8dfb35f..390db6b 100644 --- a/app/router/notification/message.py +++ b/app/router/notification/message.py @@ -206,19 +206,12 @@ async def get_message( query = select(ChatMessage).where(ChatMessage.channel_id == channel_id) if since > 0: query = query.where(col(ChatMessage.message_id) > since) - # 获取 since 之后的消息,按时间正序 - query = query.order_by(col(ChatMessage.message_id).asc()).limit(limit) - messages = (await session.exec(query)).all() - resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] - else: - # 获取最新消息,先按倒序获取最新的,然后反转为时间正序 - if until is not None: - query = query.where(col(ChatMessage.message_id) < until) - query = query.order_by(col(ChatMessage.message_id).desc()).limit(limit) - messages = (await session.exec(query)).all() - resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] - # 反转为时间正序(最老的在前面) - resp.reverse() + if until is not None: + query = query.where(col(ChatMessage.message_id) < until) + + query = query.order_by(col(ChatMessage.message_id).desc()).limit(limit) + messages = (await session.exec(query)).all() + resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] return resp diff --git a/app/router/private/team.py b/app/router/private/team.py index f5cbfc3..3f18645 100644 --- a/app/router/private/team.py +++ b/app/router/private/team.py @@ -5,7 +5,7 @@ import hashlib from app.database.lazer_user import BASE_INCLUDES, User, UserResp from app.database.team import Team, TeamMember, TeamRequest -from app.dependencies.database import Database, get_redis +from app.dependencies.database import Database from app.dependencies.storage import get_storage_service from app.dependencies.user import get_client_user from app.models.notification import ( @@ -19,9 +19,8 @@ from app.utils import check_image from .router import router -from fastapi import Depends, File, Form, HTTPException, Request, Security +from fastapi import Depends, File, Form, HTTPException, Path, Request, Security from pydantic import BaseModel -from redis.asyncio import Redis from sqlmodel import exists, select @@ -157,7 +156,7 @@ async def update_team( @router.delete("/team/{team_id}", name="删除战队", status_code=204) async def delete_team( session: Database, - team_id: int, + team_id: int = Path(..., description="战队 ID"), current_user: User = Security(get_client_user), ): team = await session.get(Team, team_id) @@ -185,7 +184,7 @@ class TeamQueryResp(BaseModel): @router.get("/team/{team_id}", name="查询战队", response_model=TeamQueryResp) async def get_team( session: Database, - team_id: int, + team_id: int = Path(..., description="战队 ID"), ): members = ( await session.exec(select(TeamMember).where(TeamMember.team_id == team_id)) @@ -202,9 +201,8 @@ async def get_team( @router.post("/team/{team_id}/request", name="请求加入战队", status_code=204) async def request_join_team( session: Database, - team_id: int, + team_id: int = Path(..., description="战队 ID"), current_user: User = Security(get_client_user), - redis: Redis = Depends(get_redis), ): team = await session.get(Team, team_id) if not team: @@ -237,8 +235,8 @@ async def request_join_team( async def handle_request( req: Request, session: Database, - team_id: int, - user_id: int, + team_id: int = Path(..., description="战队 ID"), + user_id: int = Path(..., description="用户 ID"), current_user: User = Security(get_client_user), ): team = await session.get(Team, team_id) @@ -281,11 +279,11 @@ async def handle_request( await session.commit() -@router.delete("/team/{team_id}/{user_id}", name="踢出成员 / 退出队伍", status_code=204) +@router.delete("/team/{team_id}/{user_id}", name="踢出成员 / 退出战队", status_code=204) async def kick_member( session: Database, - team_id: int, - user_id: int, + team_id: int = Path(..., description="战队 ID"), + user_id: int = Path(..., description="用户 ID"), current_user: User = Security(get_client_user), ): team = await session.get(Team, team_id) diff --git a/app/service/email_verification_service.py b/app/service/email_verification_service.py index afe34ab..5207fa9 100644 --- a/app/service/email_verification_service.py +++ b/app/service/email_verification_service.py @@ -218,10 +218,6 @@ This email was sent automatically, please do not reply. # 生成新的验证码 code = EmailVerificationService.generate_verification_code() - # 解析用户代理字符串 - from app.utils import parse_user_agent - parsed_user_agent = parse_user_agent(user_agent, max_length=255) - # 创建验证记录 verification = EmailVerification( user_id=user_id, @@ -229,7 +225,7 @@ This email was sent automatically, please do not reply. verification_code=code, expires_at=datetime.now(UTC) + timedelta(minutes=10), # 10分钟过期 ip_address=ip_address, - user_agent=parsed_user_agent # 使用解析后的用户代理 + user_agent=user_agent ) db.add(verification) @@ -392,18 +388,13 @@ class LoginSessionService: is_new_location: bool = False ) -> LoginSession: """创建登录会话""" - from app.utils import parse_user_agent - - # 解析用户代理字符串,提取关键信息 - parsed_user_agent = parse_user_agent(user_agent, max_length=255) - session_token = EmailVerificationService.generate_session_token() session = LoginSession( user_id=user_id, session_token=session_token, ip_address=ip_address, - user_agent=parsed_user_agent, # 使用解析后的用户代理 + user_agent=user_agent, country_code=country_code, is_new_location=is_new_location, expires_at=datetime.now(UTC) + timedelta(hours=24), # 24小时过期 diff --git a/app/service/online_status_maintenance.py b/app/service/online_status_maintenance.py index d49ef10..e7a1b05 100644 --- a/app/service/online_status_maintenance.py +++ b/app/service/online_status_maintenance.py @@ -20,7 +20,6 @@ async def maintain_playing_users_online_status(): 定期刷新正在游玩用户的metadata在线标记, 确保他们在游玩过程中显示为在线状态。 - 但不会恢复已经退出的用户的在线状态。 """ redis_sync = get_redis_message() redis_async = get_redis() @@ -32,25 +31,17 @@ async def maintain_playing_users_online_status(): if not playing_users: return - logger.debug(f"Checking online status for {len(playing_users)} playing users") + logger.debug(f"Maintaining online status for {len(playing_users)} playing users") - # 仅为当前有效连接的用户刷新在线状态 - updated_count = 0 + # 为每个游玩用户刷新metadata在线标记 for user_id in playing_users: user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) metadata_key = f"metadata:online:{user_id_str}" - # 重要:首先检查用户是否已经有在线标记,只有存在才刷新 - if await redis_async.exists(metadata_key): - # 只更新已经在线的用户的状态,不恢复已退出的用户 - await redis_async.set(metadata_key, "playing", ex=3600) - updated_count += 1 - else: - # 如果用户已退出(没有在线标记),则从游玩用户中移除 - await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, user_id) - logger.debug(f"Removed user {user_id_str} from playing users as they are offline") + # 设置或刷新metadata在线标记,过期时间为1小时 + await redis_async.set(metadata_key, "playing", ex=3600) - logger.debug(f"Updated metadata online status for {updated_count} playing users") + logger.debug(f"Updated metadata online status for {len(playing_users)} playing users") except Exception as e: logger.error(f"Error maintaining playing users online status: {e}") diff --git a/app/service/redis_message_system.py b/app/service/redis_message_system.py index 46f256f..51c7df6 100644 --- a/app/service/redis_message_system.py +++ b/app/service/redis_message_system.py @@ -358,7 +358,9 @@ class RedisMessageSystem: # 确保消息按ID正序排序(时间顺序) messages.sort(key=lambda x: x.get("message_id", 0)) - return messages + # 如果是获取最新消息(since=0),需要保持倒序(最新的在前面) + if since == 0: + messages.reverse() return messages diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 59f6106..2ea759a 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -2,9 +2,7 @@ from __future__ import annotations import asyncio from datetime import UTC, datetime, timedelta -from typing import override, Dict, List, Optional, Tuple -import json -from collections import defaultdict, deque +from typing import override from app.database import Room from app.database.beatmap import Beatmap @@ -48,7 +46,6 @@ from app.models.room import ( from app.models.score import GameMode from .hub import Client, Hub -from .multiplayer_packet_cleaner import packet_cleaner, GameSessionCleaner from httpx import HTTPError from sqlalchemy import update @@ -57,282 +54,6 @@ from sqlmodel import col, exists, select GAMEPLAY_LOAD_TIMEOUT = 30 -class GameplayStateBuffer: - """游戏状态缓冲区,用于管理实时排行榜和观战数据同步""" - - def __init__(self): - # 房间ID -> 用户分数数据缓冲区 - self.score_buffers: Dict[int, Dict[int, deque]] = defaultdict(lambda: defaultdict(lambda: deque(maxlen=50))) - # 房间ID -> 实时排行榜数据 - self.leaderboards: Dict[int, List[Dict]] = defaultdict(list) - # 房间ID -> 游戏状态快照 - self.gameplay_snapshots: Dict[int, Dict] = {} - # 用户观战状态缓存 - self.spectator_states: Dict[Tuple[int, int], Dict] = {} # (room_id, user_id) -> state - - async def add_score_frame(self, room_id: int, user_id: int, frame_data: Dict): - """添加分数帧数据到缓冲区""" - self.score_buffers[room_id][user_id].append({ - **frame_data, - 'timestamp': datetime.now(UTC), - 'user_id': user_id - }) - - # 更新实时排行榜 - await self._update_leaderboard(room_id) - - async def _update_leaderboard(self, room_id: int): - """更新实时排行榜""" - leaderboard = [] - - for user_id, frames in self.score_buffers[room_id].items(): - if not frames: - continue - - latest_frame = frames[-1] - leaderboard.append({ - 'user_id': user_id, - 'score': latest_frame.get('score', 0), - 'combo': latest_frame.get('combo', 0), - 'accuracy': latest_frame.get('accuracy', 0.0), - 'completed': latest_frame.get('completed', False), - 'timestamp': latest_frame['timestamp'] - }) - - # 按分数排序 - leaderboard.sort(key=lambda x: (-x['score'], -x['accuracy'])) - self.leaderboards[room_id] = leaderboard - - def get_leaderboard(self, room_id: int) -> List[Dict]: - """获取房间实时排行榜""" - return self.leaderboards.get(room_id, []) - - async def create_gameplay_snapshot(self, room_id: int, room_data: Dict): - """创建游戏状态快照用于新加入的观众""" - # 序列化复杂对象 - serialized_room_data = self._serialize_room_data(room_data) - - snapshot = { - 'room_id': room_id, - 'state': serialized_room_data.get('state'), - 'current_item': serialized_room_data.get('current_item'), - 'users': serialized_room_data.get('users', []), - 'leaderboard': self.get_leaderboard(room_id), - 'created_at': datetime.now(UTC).isoformat() - } - self.gameplay_snapshots[room_id] = snapshot - return snapshot - - def _serialize_room_data(self, room_data: Dict) -> Dict: - """序列化房间数据""" - result = {} - for key, value in room_data.items(): - if hasattr(value, 'value') and hasattr(value, 'name'): - # 枚举类型 - result[key] = {'name': value.name, 'value': value.value} - elif hasattr(value, '__dict__'): - # 复杂对象 - if hasattr(value, 'model_dump'): - result[key] = value.model_dump() - elif hasattr(value, 'dict'): - result[key] = value.dict() - else: - # 手动序列化 - obj_dict = {} - for attr_name, attr_value in value.__dict__.items(): - if not attr_name.startswith('_'): - obj_dict[attr_name] = self._serialize_value(attr_value) - result[key] = obj_dict - elif isinstance(value, (list, tuple)): - result[key] = [self._serialize_value(item) for item in value] - else: - result[key] = self._serialize_value(value) - return result - - def _serialize_value(self, value): - """序列化单个值""" - if hasattr(value, 'value') and hasattr(value, 'name'): - # 枚举类型 - return {'name': value.name, 'value': value.value} - elif hasattr(value, '__dict__'): - # 复杂对象 - if hasattr(value, 'model_dump'): - return value.model_dump() - elif hasattr(value, 'dict'): - return value.dict() - else: - obj_dict = {} - for attr_name, attr_value in value.__dict__.items(): - if not attr_name.startswith('_'): - obj_dict[attr_name] = self._serialize_value(attr_value) - return obj_dict - elif isinstance(value, (list, tuple)): - return [self._serialize_value(item) for item in value] - elif isinstance(value, dict): - return {k: self._serialize_value(v) for k, v in value.items()} - elif isinstance(value, (str, int, float, bool, type(None))): - return value - else: - return str(value) - - def get_gameplay_snapshot(self, room_id: int) -> Optional[Dict]: - """获取游戏状态快照""" - return self.gameplay_snapshots.get(room_id) - - async def set_spectator_state(self, room_id: int, user_id: int, state_data: Dict): - """设置观战者状态""" - key = (room_id, user_id) - self.spectator_states[key] = { - **state_data, - 'last_updated': datetime.now(UTC) - } - - def get_spectator_state(self, room_id: int, user_id: int) -> Optional[Dict]: - """获取观战者状态""" - key = (room_id, user_id) - return self.spectator_states.get(key) - - async def cleanup_room(self, room_id: int): - """清理房间相关数据""" - self.score_buffers.pop(room_id, None) - self.leaderboards.pop(room_id, None) - self.gameplay_snapshots.pop(room_id, None) - - # 清理观战者状态 - keys_to_remove = [key for key in self.spectator_states.keys() if key[0] == room_id] - for key in keys_to_remove: - self.spectator_states.pop(key, None) - - async def cleanup_game_session(self, room_id: int): - """清理单局游戏会话数据(每局游戏结束后调用)""" - # 清理分数缓冲区但保留房间结构 - if room_id in self.score_buffers: - self.score_buffers[room_id].clear() - - # 清理实时排行榜 - self.leaderboards.pop(room_id, None) - - # 清理游戏状态快照 - self.gameplay_snapshots.pop(room_id, None) - - # 清理观战者状态但不删除房间相关键 - keys_to_remove = [] - for key in self.spectator_states.keys(): - if key[0] == room_id: - # 保留连接状态,清理游戏数据 - state = self.spectator_states[key] - if 'game_data' in state: - state.pop('game_data', None) - if 'score_data' in state: - state.pop('score_data', None) - - logger.info(f"[GameplayStateBuffer] Cleaned game session data for room {room_id}") - - def reset_user_gameplay_state(self, room_id: int, user_id: int): - """重置单个用户的游戏状态""" - # 清理用户分数缓冲区 - if room_id in self.score_buffers and user_id in self.score_buffers[room_id]: - self.score_buffers[room_id][user_id].clear() - - # 重置观战者状态中的游戏数据 - key = (room_id, user_id) - if key in self.spectator_states: - state = self.spectator_states[key] - state.pop('game_data', None) - state.pop('score_data', None) - state['last_reset'] = datetime.now(UTC) - - -class SpectatorSyncManager: - """观战同步管理器,处理跨Hub通信""" - - def __init__(self, redis_client): - self.redis = redis_client - self.channel_prefix = "multiplayer_spectator" - - def _serialize_for_json(self, obj): - """递归序列化对象为JSON兼容格式""" - if hasattr(obj, '__dict__'): - # 如果对象有__dict__属性,将其转换为字典 - if hasattr(obj, 'model_dump'): - # 对于Pydantic模型 - return obj.model_dump() - elif hasattr(obj, 'dict'): - # 对于较旧的Pydantic模型 - return obj.dict() - else: - # 对于普通对象 - result = {} - for key, value in obj.__dict__.items(): - if not key.startswith('_'): # 跳过私有属性 - result[key] = self._serialize_for_json(value) - return result - elif isinstance(obj, dict): - return {key: self._serialize_for_json(value) for key, value in obj.items()} - elif isinstance(obj, (list, tuple)): - return [self._serialize_for_json(item) for item in obj] - elif isinstance(obj, datetime): - # 处理datetime对象 - return obj.isoformat() - elif hasattr(obj, 'value') and hasattr(obj, 'name'): - # 对于枚举类型 - return {'name': obj.name, 'value': obj.value} - elif isinstance(obj, (str, int, float, bool, type(None))): - return obj - else: - # 对于其他类型,尝试转换为字符串 - return str(obj) - - async def notify_spectator_hubs(self, room_id: int, event_type: str, data: Dict): - """通知观战Hub游戏状态变化""" - # 序列化复杂对象为JSON兼容格式 - serialized_data = self._serialize_for_json(data) - - message = { - 'room_id': room_id, - 'event_type': event_type, - 'data': serialized_data, - 'timestamp': datetime.now(UTC).isoformat() - } - - channel = f"{self.channel_prefix}:room:{room_id}" - await self.redis.publish(channel, json.dumps(message)) - - async def notify_gameplay_started(self, room_id: int, game_data: Dict): - """通知游戏开始""" - await self.notify_spectator_hubs(room_id, "gameplay_started", game_data) - - async def notify_gameplay_ended(self, room_id: int, results_data: Dict): - """通知游戏结束""" - await self.notify_spectator_hubs(room_id, "gameplay_ended", results_data) - - async def notify_user_state_change(self, room_id: int, user_id: int, old_state: str, new_state: str): - """通知用户状态变化""" - await self.notify_spectator_hubs(room_id, "user_state_changed", { - 'user_id': user_id, - 'old_state': old_state, - 'new_state': new_state - }) - - async def subscribe_to_spectator_events(self, callback): - """订阅观战事件""" - pattern = f"{self.channel_prefix}:*" - pubsub = self.redis.pubsub() - await pubsub.psubscribe(pattern) - - async for message in pubsub.listen(): - if message['type'] == 'pmessage': - try: - data = json.loads(message['data']) - await callback(message['channel'], data) - except Exception as e: - logger.error(f"Error processing spectator event: {e}") - - -# 全局实例 -gameplay_buffer = GameplayStateBuffer() - - class MultiplayerEventLogger: def __init__(self): pass @@ -427,100 +148,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): super().__init__() self.rooms: dict[int, ServerMultiplayerRoom] = {} self.event_logger = MultiplayerEventLogger() - self.spectator_sync_manager: Optional[SpectatorSyncManager] = None - # 实时数据推送任务管理 - self.leaderboard_tasks: Dict[int, asyncio.Task] = {} - # 观战状态同步任务 - self.spectator_sync_tasks: Dict[int, asyncio.Task] = {} - - # 启动定期清理任务(参考osu源码的清理机制) - self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) - - async def _periodic_cleanup(self): - """定期清理过期数据包的后台任务""" - while True: - try: - await asyncio.sleep(60) # 每分钟执行一次 - await packet_cleaner.cleanup_expired_packets() - - # 记录清理统计 - stats = packet_cleaner.get_cleanup_stats() - if stats['pending_packets'] > 0: - logger.debug(f"[MultiplayerHub] Cleanup stats: {stats}") - - except Exception as e: - logger.error(f"[MultiplayerHub] Error in periodic cleanup: {e}") - except asyncio.CancelledError: - logger.info("[MultiplayerHub] Periodic cleanup task cancelled") - break - - async def initialize_managers(self): - """初始化管理器""" - if not self.spectator_sync_manager: - redis = get_redis() - self.spectator_sync_manager = SpectatorSyncManager(redis) - - # 启动观战事件监听 - asyncio.create_task(self.spectator_sync_manager.subscribe_to_spectator_events( - self._handle_spectator_event - )) - - async def _handle_spectator_event(self, channel: str, data: Dict): - """处理观战事件""" - try: - room_id = data.get('room_id') - event_type = data.get('event_type') - event_data = data.get('data', {}) - - if room_id and event_type and room_id in self.rooms: - server_room = self.rooms[room_id] - await self._process_spectator_event(server_room, event_type, event_data) - except Exception as e: - logger.error(f"Error handling spectator event: {e}") - - async def _process_spectator_event(self, server_room: ServerMultiplayerRoom, event_type: str, event_data: Dict): - """处理具体的观战事件""" - room_id = server_room.room.room_id - - if event_type == "spectator_joined": - user_id = event_data.get('user_id') - if user_id: - await self._sync_spectator_with_current_state(room_id, user_id) - - elif event_type == "request_leaderboard": - user_id = event_data.get('user_id') - if user_id: - leaderboard = gameplay_buffer.get_leaderboard(room_id) - await self._send_leaderboard_to_spectator(user_id, leaderboard) - - async def _sync_spectator_with_current_state(self, room_id: int, user_id: int): - """同步观战者与当前游戏状态""" - try: - snapshot = gameplay_buffer.get_gameplay_snapshot(room_id) - if snapshot: - # 通过Redis发送状态同步信息给SpectatorHub - redis = get_redis() - sync_data = { - 'target_user': user_id, - 'snapshot': snapshot, - 'timestamp': datetime.now(UTC).isoformat() - } - await redis.publish(f"spectator_sync:{room_id}", json.dumps(sync_data)) - except Exception as e: - logger.error(f"Error syncing spectator {user_id} with room {room_id}: {e}") - - async def _send_leaderboard_to_spectator(self, user_id: int, leaderboard: List[Dict]): - """发送排行榜数据给观战者""" - try: - redis = get_redis() - leaderboard_data = { - 'target_user': user_id, - 'leaderboard': leaderboard, - 'timestamp': datetime.now(UTC).isoformat() - } - await redis.publish(f"leaderboard_update:{user_id}", json.dumps(leaderboard_data)) - except Exception as e: - logger.error(f"Error sending leaderboard to spectator {user_id}: {e}") @staticmethod def group_id(room: int) -> str: @@ -644,10 +271,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): async def JoinRoomWithPassword(self, client: Client, room_id: int, password: str): logger.info(f"[MultiplayerHub] {client.user_id} joining room {room_id}") - - # 初始化管理器 - await self.initialize_managers() - store = self.get_or_create_state(client) if store.room_id != 0: raise InvokeException("You are already in a room") @@ -670,13 +293,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]): self.add_to_group(client, self.group_id(room_id)) await server_room.match_type_handler.handle_join(user) - # Enhanced: Send current room and gameplay state to new user + # Critical fix: Send current room and gameplay state to new user # This ensures spectators joining ongoing games get proper state sync await self._send_room_state_to_new_user(client, server_room) - - # 如果正在进行游戏,同步游戏状态 - if room.state in [MultiplayerRoomState.PLAYING, MultiplayerRoomState.WAITING_FOR_LOAD]: - await self._sync_new_user_with_gameplay(client, server_room) await self.event_logger.player_joined(room_id, user.user_id) @@ -708,42 +327,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): redis = get_redis() await redis.publish("chat:room:joined", f"{room.channel_id}:{user.user_id}") - - # 通知观战Hub有新用户加入 - if self.spectator_sync_manager: - await self.spectator_sync_manager.notify_spectator_hubs( - room_id, "user_joined", {'user_id': user.user_id} - ) - - return room - - async def _sync_new_user_with_gameplay(self, client: Client, room: ServerMultiplayerRoom): - """同步新用户与正在进行的游戏状态""" - try: - room_id = room.room.room_id - - # 获取游戏状态快照 - snapshot = gameplay_buffer.get_gameplay_snapshot(room_id) - if not snapshot: - # 创建新的快照 - room_data = { - 'state': room.room.state, - 'current_item': room.queue.current_item, - 'users': [{'user_id': u.user_id, 'state': u.state} for u in room.room.users] - } - snapshot = await gameplay_buffer.create_gameplay_snapshot(room_id, room_data) - - # 发送游戏状态到新用户 - await self.broadcast_call(client.connection_id, "GameplayStateSync", snapshot) - - # 发送实时排行榜 - leaderboard = gameplay_buffer.get_leaderboard(room_id) - if leaderboard: - await self.broadcast_call(client.connection_id, "LeaderboardUpdate", leaderboard) - - logger.info(f"[MultiplayerHub] Synced gameplay state for user {client.user_id} in room {room_id}") - except Exception as e: - logger.error(f"Error syncing new user with gameplay: {e}") return room @@ -1121,32 +704,14 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if user.state == state: return - # 记录状态变化用于观战同步 - old_state = user.state - # Special handling for state changes during gameplay match state: case MultiplayerUserState.IDLE: if user.state.is_playing: - # 玩家退出游戏时,清理分数缓冲区 - room_id = room.room_id - if room_id in gameplay_buffer.score_buffers: - gameplay_buffer.score_buffers[room_id].pop(user.user_id, None) - await gameplay_buffer._update_leaderboard(room_id) - await self._broadcast_leaderboard_update(server_room) return case MultiplayerUserState.LOADED | MultiplayerUserState.READY_FOR_GAMEPLAY: if not user.state.is_playing: return - case MultiplayerUserState.PLAYING: - # 开始游戏时初始化分数缓冲区 - room_id = room.room_id - await gameplay_buffer.add_score_frame(room_id, user.user_id, { - 'score': 0, - 'combo': 0, - 'accuracy': 100.0, - 'completed': False - }) logger.info( f"[MultiplayerHub] User {user.user_id} changing state from {user.state} to {state}" @@ -1164,129 +729,19 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if state == MultiplayerUserState.SPECTATING: await self.handle_spectator_state_change(client, server_room, user) - # 通知观战Hub状态变化 - if self.spectator_sync_manager: - await self.spectator_sync_manager.notify_user_state_change( - room.room_id, user.user_id, old_state.name, state.name - ) - await self.update_room_state(server_room) - async def _broadcast_leaderboard_update(self, room: ServerMultiplayerRoom): - """广播实时排行榜更新""" - try: - room_id = room.room.room_id - leaderboard = gameplay_buffer.get_leaderboard(room_id) - - if leaderboard: - await self.broadcast_group_call( - self.group_id(room_id), - "LeaderboardUpdate", - leaderboard - ) - - logger.debug(f"[MultiplayerHub] Broadcasted leaderboard update to room {room_id}") - except Exception as e: - logger.error(f"Error broadcasting leaderboard update: {e}") - - async def _start_leaderboard_broadcast_task(self, room_id: int): - """启动实时排行榜广播任务""" - if room_id in self.leaderboard_tasks: - return - - async def leaderboard_broadcast_loop(): - try: - while room_id in self.rooms and room_id in self.leaderboard_tasks: - if room_id in self.rooms: - server_room = self.rooms[room_id] - if server_room.room.state == MultiplayerRoomState.PLAYING: - await self._broadcast_leaderboard_update(server_room) - - await asyncio.sleep(1.0) # 每秒更新一次排行榜 - except asyncio.CancelledError: - pass - except Exception as e: - logger.error(f"Error in leaderboard broadcast loop for room {room_id}: {e}") - - task = asyncio.create_task(leaderboard_broadcast_loop()) - self.leaderboard_tasks[room_id] = task - - async def _stop_leaderboard_broadcast_task(self, room_id: int): - """停止实时排行榜广播任务""" - if room_id in self.leaderboard_tasks: - task = self.leaderboard_tasks.pop(room_id) - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - async def _cleanup_game_session(self, room_id: int, game_completed: bool): - """清理单局游戏会话数据(基于osu源码实现)""" - try: - # 停止实时排行榜广播 - await self._stop_leaderboard_broadcast_task(room_id) - - # 获取最终排行榜 - final_leaderboard = gameplay_buffer.get_leaderboard(room_id) - - # 发送最终排行榜给所有用户 - if final_leaderboard: - await self.broadcast_group_call( - self.group_id(room_id), - "FinalLeaderboard", - final_leaderboard - ) - - # 使用新的清理管理器清理游戏会话(参考osu源码) - await GameSessionCleaner.cleanup_game_session(room_id, game_completed) - - # 清理游戏会话数据 - await gameplay_buffer.cleanup_game_session(room_id) - - # 通知观战同步管理器游戏结束 - if hasattr(self, 'spectator_sync_manager') and self.spectator_sync_manager: - await self.spectator_sync_manager.notify_gameplay_ended(room_id, { - 'final_leaderboard': final_leaderboard, - 'completed': game_completed, - 'timestamp': datetime.now(UTC).isoformat() - }) - - # 重置所有用户的游戏状态 - if room_id in self.rooms: - room = self.rooms[room_id] - for user in room.room.users: - gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) - # 安排用户会话清理 - await GameSessionCleaner.cleanup_user_session(room_id, user.user_id) - - logger.info(f"[MultiplayerHub] Cleaned up game session for room {room_id} (completed: {game_completed})") - - except Exception as e: - logger.error(f"[MultiplayerHub] Failed to cleanup game session for room {room_id}: {e}") - # 即使清理失败也不应该影响游戏流程 - async def change_user_state( self, room: ServerMultiplayerRoom, user: MultiplayerRoomUser, state: MultiplayerUserState, ): - old_state = user.state - logger.info( f"[MultiplayerHub] {user.user_id}'s state " - f"changed from {old_state} to {state}" + f"changed from {user.state} to {state}" ) - user.state = state - - # 在用户进入RESULTS状态时清理其游戏数据(参考osu源码) - if state == MultiplayerUserState.RESULTS and old_state.is_playing: - room_id = room.room.room_id - gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) - logger.debug(f"[MultiplayerHub] Reset gameplay state for user {user.user_id} in room {room_id}") - await self.broadcast_group_call( self.group_id(room.room.room_id), "UserStateChanged", @@ -1547,10 +1002,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): # This ensures cross-hub spectating works properly await self._notify_spectator_hub_game_ended(room) - # 每局游戏结束后的清理工作 - room_id = room.room.room_id - await self._cleanup_game_session(room_id, any_user_finished_playing) - if any_user_finished_playing: await self.event_logger.game_completed( room.room.room_id, @@ -1566,44 +1017,16 @@ class MultiplayerHub(Hub[MultiplayerClientState]): async def change_room_state( self, room: ServerMultiplayerRoom, state: MultiplayerRoomState ): - old_state = room.room.state - room_id = room.room.room_id - logger.debug( - f"[MultiplayerHub] Room {room_id} state " - f"changed from {old_state} to {state}" + f"[MultiplayerHub] Room {room.room.room_id} state " + f"changed from {room.room.state} to {state}" ) - room.room.state = state await self.broadcast_group_call( - self.group_id(room_id), + self.group_id(room.room.room_id), "RoomStateChanged", state, ) - - # 处理状态变化的特殊逻辑 - if old_state == MultiplayerRoomState.PLAYING and state == MultiplayerRoomState.OPEN: - # 游戏结束,停止实时排行榜广播 - await self._stop_leaderboard_broadcast_task(room_id) - - # 发送最终排行榜 - leaderboard = gameplay_buffer.get_leaderboard(room_id) - if leaderboard: - await self.broadcast_group_call( - self.group_id(room_id), - "FinalLeaderboard", - leaderboard - ) - - # 通知观战Hub游戏结束 - if self.spectator_sync_manager: - await self.spectator_sync_manager.notify_gameplay_ended(room_id, { - 'leaderboard': leaderboard - }) - - elif state == MultiplayerRoomState.PLAYING: - # 游戏开始,启动实时排行榜 - await self._start_leaderboard_broadcast_task(room_id) async def StartMatch(self, client: Client): server_room = self._ensure_in_room(client) @@ -1676,8 +1099,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): await room.stop_all_countdowns(ForceGameplayStartCountdown) playing = False played_user = 0 - room_id = room.room.room_id - for user in room.room.users: client = self.get_client_by_id(str(user.user_id)) if client is None: @@ -1691,15 +1112,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): played_user += 1 await self.change_user_state(room, user, MultiplayerUserState.PLAYING) await self.call_noblock(client, "GameplayStarted") - - # 初始化玩家分数缓冲区 - await gameplay_buffer.add_score_frame(room_id, user.user_id, { - 'score': 0, - 'combo': 0, - 'accuracy': 100.0, - 'completed': False - }) - elif user.state == MultiplayerUserState.WAITING_FOR_LOAD: await self.change_user_state(room, user, MultiplayerUserState.IDLE) await self.broadcast_group_call( @@ -1707,28 +1119,11 @@ class MultiplayerHub(Hub[MultiplayerClientState]): "GameplayAborted", GameplayAbortReason.LOAD_TOOK_TOO_LONG, ) - await self.change_room_state( room, (MultiplayerRoomState.PLAYING if playing else MultiplayerRoomState.OPEN), ) - if playing: - # 创建游戏状态快照 - room_data = { - 'state': room.room.state, - 'current_item': room.queue.current_item, - 'users': [{'user_id': u.user_id, 'state': u.state} for u in room.room.users] - } - await gameplay_buffer.create_gameplay_snapshot(room_id, room_data) - - # 启动实时排行榜广播 - await self._start_leaderboard_broadcast_task(room_id) - - # 通知观战Hub游戏开始 - if self.spectator_sync_manager: - await self.spectator_sync_manager.notify_gameplay_started(room_id, room_data) - redis = get_redis() await redis.set( f"multiplayer:{room.room.room_id}:gameplay:players", @@ -1754,40 +1149,31 @@ class MultiplayerHub(Hub[MultiplayerClientState]): user: MultiplayerRoomUser, kicked: bool = False, ): - room_id = room.room.room_id - user_id = user.user_id - if client: - self.remove_from_group(client, self.group_id(room_id)) + self.remove_from_group(client, self.group_id(room.room.room_id)) room.room.users.remove(user) - target_store = self.state.get(user_id) + target_store = self.state.get(user.user_id) if target_store: target_store.room_id = 0 - # 清理用户的游戏状态数据(参考osu源码) - gameplay_buffer.reset_user_gameplay_state(room_id, user_id) - - # 使用清理管理器安排用户会话清理 - await GameSessionCleaner.cleanup_user_session(room_id, user_id) - redis = get_redis() - await redis.publish("chat:room:left", f"{room.room.channel_id}:{user_id}") + await redis.publish("chat:room:left", f"{room.room.channel_id}:{user.user_id}") async with with_db() as session: async with session.begin(): participated_user = ( await session.exec( select(RoomParticipatedUser).where( - RoomParticipatedUser.room_id == room_id, - RoomParticipatedUser.user_id == user_id, + RoomParticipatedUser.room_id == room.room.room_id, + RoomParticipatedUser.user_id == user.user_id, ) ) ).first() if participated_user is not None: participated_user.left_at = datetime.now(UTC) - db_room = await session.get(Room, room_id) + db_room = await session.get(Room, room.room.room_id) if db_room is None: raise InvokeException("Room does not exist in database") if db_room.participant_count > 0: @@ -1800,7 +1186,7 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if ( len(room.room.users) != 0 and room.room.host - and room.room.host.user_id == user_id + and room.room.host.user_id == user.user_id ): next_host = room.room.users[0] await self.set_host(room, next_host) @@ -1809,11 +1195,11 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if client: await self.call_noblock(client, "UserKicked", user) await self.broadcast_group_call( - self.group_id(room_id), "UserKicked", user + self.group_id(room.room.room_id), "UserKicked", user ) else: await self.broadcast_group_call( - self.group_id(room_id), "UserLeft", user + self.group_id(room.room.room_id), "UserLeft", user ) async def end_room(self, room: ServerMultiplayerRoom): @@ -1838,106 +1224,8 @@ class MultiplayerHub(Hub[MultiplayerClientState]): room.room.room_id, room.room.host.user_id, ) - - room_id = room.room.room_id - - # 清理实时数据 - await self._stop_leaderboard_broadcast_task(room_id) - await gameplay_buffer.cleanup_room(room_id) - - # 使用清理管理器完全清理房间(参考osu源码) - await GameSessionCleaner.cleanup_room_fully(room_id) - - # 清理观战同步任务 - if room_id in self.spectator_sync_tasks: - task = self.spectator_sync_tasks.pop(room_id) - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - del self.rooms[room_id] - logger.info(f"[MultiplayerHub] Room {room_id} ended") - - async def UpdateScore(self, client: Client, score_data: Dict): - """接收并处理实时分数更新""" - try: - server_room = self._ensure_in_room(client) - room = server_room.room - user = next((u for u in room.users if u.user_id == client.user_id), None) - - if user is None: - raise InvokeException("User not found in room") - - if room.state != MultiplayerRoomState.PLAYING: - return - - if user.state != MultiplayerUserState.PLAYING: - return - - room_id = room.room_id - - # 添加分数帧到缓冲区 - await gameplay_buffer.add_score_frame(room_id, client.user_id, { - 'score': score_data.get('score', 0), - 'combo': score_data.get('combo', 0), - 'accuracy': score_data.get('accuracy', 0.0), - 'completed': score_data.get('completed', False), - 'hp': score_data.get('hp', 1.0), - 'position': score_data.get('position', 0) - }) - - # 安排分数数据包清理(参考osu源码的数据包管理) - await packet_cleaner.schedule_cleanup(room_id, { - 'type': 'score', - 'user_id': client.user_id, - 'data_size': len(str(score_data)), - 'timestamp': datetime.now(UTC).isoformat() - }) - - # 如果游戏完成,标记用户状态 - if score_data.get('completed', False): - await self.change_user_state( - server_room, user, MultiplayerUserState.FINISHED_PLAY - ) - - # 立即安排该用户的清理 - await GameSessionCleaner.cleanup_user_session(room_id, client.user_id) - - except Exception as e: - logger.error(f"Error updating score for user {client.user_id}: {e}") - - async def GetLeaderboard(self, client: Client) -> List[Dict]: - """获取当前房间的实时排行榜""" - try: - server_room = self._ensure_in_room(client) - room_id = server_room.room.room_id - return gameplay_buffer.get_leaderboard(room_id) - except Exception as e: - logger.error(f"Error getting leaderboard for user {client.user_id}: {e}") - return [] - - async def RequestSpectatorSync(self, client: Client): - """观战者请求同步当前游戏状态""" - try: - server_room = self._ensure_in_room(client) - room_id = server_room.room.room_id - - # 发送游戏状态快照 - snapshot = gameplay_buffer.get_gameplay_snapshot(room_id) - if snapshot: - await self.broadcast_call(client.connection_id, "GameplayStateSync", snapshot) - - # 发送当前排行榜 - leaderboard = gameplay_buffer.get_leaderboard(room_id) - if leaderboard: - await self.broadcast_call(client.connection_id, "LeaderboardUpdate", leaderboard) - - logger.info(f"[MultiplayerHub] Sent spectator sync to user {client.user_id}") - - except Exception as e: - logger.error(f"Error handling spectator sync request: {e}") + del self.rooms[room.room.room_id] + logger.info(f"[MultiplayerHub] Room {room.room.room_id} ended") async def LeaveRoom(self, client: Client): store = self.get_or_create_state(client) @@ -2016,18 +1304,12 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if not user.state.is_playing: raise InvokeException("Cannot abort gameplay while not in a gameplay state") - # 清理用户游戏数据(参考osu源码) - room_id = room.room_id - gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) - await self.change_user_state( server_room, user, MultiplayerUserState.IDLE, ) await self.update_room_state(server_room) - - logger.info(f"[MultiplayerHub] User {user.user_id} aborted gameplay in room {room_id}") async def AbortMatch(self, client: Client): server_room = self._ensure_in_room(client) @@ -2040,13 +1322,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): ): raise InvokeException("Cannot abort a match that hasn't started.") - room_id = room.room_id - - # 清理所有玩家的游戏状态数据(参考osu源码) - for user in room.users: - if user.state.is_playing: - gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) - await asyncio.gather( *[ self.change_user_state(server_room, u, MultiplayerUserState.IDLE) @@ -2054,18 +1329,14 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if u.state.is_playing ] ) - - # 执行完整的游戏会话清理 - await self._cleanup_game_session(room_id, False) # False表示游戏被中断而非完成 - await self.broadcast_group_call( - self.group_id(room_id), + self.group_id(room.room_id), "GameplayAborted", GameplayAbortReason.HOST_ABORTED, ) await self.update_room_state(server_room) logger.info( - f"[MultiplayerHub] {client.user_id} aborted match in room {room_id}" + f"[MultiplayerHub] {client.user_id} aborted match in room {room.room_id}" ) async def change_user_match_state( @@ -2243,9 +1514,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): # Import here to avoid circular imports from app.signalr.hub import SpectatorHubs from app.models.spectator_hub import SpectatedUserState, SpectatorState - from .spectator_buffer import spectator_state_manager - - room_id = room.room.room_id # For each user who finished the game, notify SpectatorHub for room_user in room.room.users: @@ -2259,12 +1527,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): maximum_statistics={}, ) - # 同步到观战缓冲区管理器 - await spectator_state_manager.handle_user_finished_playing( - room_user.user_id, - finished_state - ) - # Notify all SpectatorHub watchers that this user finished await SpectatorHubs.broadcast_group_call( SpectatorHubs.group_id(room_user.user_id), @@ -2274,35 +1536,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]): ) logger.debug( - f"[MultiplayerHub] Synced and notified SpectatorHub that user {room_user.user_id} finished game" + f"[MultiplayerHub] Notified SpectatorHub that user {room_user.user_id} finished game" ) - # 同步游戏中玩家的状态 - elif room_user.state == MultiplayerUserState.PLAYING: - try: - multiplayer_data = { - 'room_id': room_id, - 'beatmap_id': room.queue.current_item.beatmap_id, - 'ruleset_id': room_user.ruleset_id or 0, - 'mods': room_user.mods, - 'state': room_user.state, - 'maximum_statistics': {} - } - - await spectator_state_manager.sync_with_multiplayer( - room_user.user_id, - multiplayer_data - ) - - logger.debug( - f"[MultiplayerHub] Synced playing state for user {room_user.user_id} to SpectatorHub buffer" - ) - - except Exception as e: - logger.debug( - f"[MultiplayerHub] Failed to sync playing state for user {room_user.user_id}: {e}" - ) - except Exception as e: logger.debug( f"[MultiplayerHub] Failed to notify SpectatorHub about game end: {e}" diff --git a/app/signalr/hub/multiplayer_packet_cleaner.py b/app/signalr/hub/multiplayer_packet_cleaner.py deleted file mode 100644 index 8fa5c3e..0000000 --- a/app/signalr/hub/multiplayer_packet_cleaner.py +++ /dev/null @@ -1,221 +0,0 @@ -""" -多人游戏数据包清理管理器 -基于osu-server源码实现的数据包清理逻辑 -""" - -from __future__ import annotations - -import asyncio -from datetime import UTC, datetime, timedelta -from typing import Dict, List, Optional, Set -from collections import defaultdict -import logging - -logger = logging.getLogger(__name__) - - -class MultiplayerPacketCleaner: - """多人游戏数据包清理管理器(基于osu源码设计)""" - - def __init__(self): - # 待清理的数据包队列 - self.cleanup_queue: Dict[int, List[Dict]] = defaultdict(list) - # 清理任务映射 - self.cleanup_tasks: Dict[int, asyncio.Task] = {} - # 延迟清理时间(秒) - self.cleanup_delay = 5.0 - # 强制清理时间(秒) - self.force_cleanup_delay = 30.0 - - async def schedule_cleanup(self, room_id: int, packet_data: Dict): - """安排数据包清理(参考osu源码的清理调度)""" - self.cleanup_queue[room_id].append({ - **packet_data, - 'scheduled_at': datetime.now(UTC), - 'room_id': room_id - }) - - # 如果没有正在进行的清理任务,开始新的清理任务 - if room_id not in self.cleanup_tasks or self.cleanup_tasks[room_id].done(): - self.cleanup_tasks[room_id] = asyncio.create_task( - self._delayed_cleanup_task(room_id) - ) - logger.debug(f"[PacketCleaner] Scheduled cleanup task for room {room_id}") - - async def _delayed_cleanup_task(self, room_id: int): - """延迟清理任务(类似osu源码的延迟清理机制)""" - try: - # 等待延迟时间 - await asyncio.sleep(self.cleanup_delay) - - # 执行清理 - await self._execute_cleanup(room_id) - - except asyncio.CancelledError: - logger.debug(f"[PacketCleaner] Cleanup task for room {room_id} was cancelled") - raise - except Exception as e: - logger.error(f"[PacketCleaner] Error during cleanup for room {room_id}: {e}") - - async def _execute_cleanup(self, room_id: int): - """执行实际的清理操作""" - if room_id not in self.cleanup_queue: - return - - packets_to_clean = self.cleanup_queue.pop(room_id, []) - if not packets_to_clean: - return - - logger.info(f"[PacketCleaner] Cleaning {len(packets_to_clean)} packets for room {room_id}") - - # 按类型分组处理清理 - score_packets = [] - state_packets = [] - leaderboard_packets = [] - - for packet in packets_to_clean: - packet_type = packet.get('type', 'unknown') - if packet_type == 'score': - score_packets.append(packet) - elif packet_type == 'state': - state_packets.append(packet) - elif packet_type == 'leaderboard': - leaderboard_packets.append(packet) - - # 清理分数数据包 - if score_packets: - await self._cleanup_score_packets(room_id, score_packets) - - # 清理状态数据包 - if state_packets: - await self._cleanup_state_packets(room_id, state_packets) - - # 清理排行榜数据包 - if leaderboard_packets: - await self._cleanup_leaderboard_packets(room_id, leaderboard_packets) - - async def _cleanup_score_packets(self, room_id: int, packets: List[Dict]): - """清理分数相关数据包""" - user_ids = set(p.get('user_id') for p in packets if p.get('user_id')) - logger.debug(f"[PacketCleaner] Cleaning score packets for {len(user_ids)} users in room {room_id}") - - # 这里可以添加具体的清理逻辑,比如: - # - 清理过期的分数帧 - # - 压缩历史分数数据 - # - 清理缓存 - - async def _cleanup_state_packets(self, room_id: int, packets: List[Dict]): - """清理状态相关数据包""" - logger.debug(f"[PacketCleaner] Cleaning {len(packets)} state packets for room {room_id}") - - # 这里可以添加状态清理逻辑,比如: - # - 清理过期状态数据 - # - 重置用户状态缓存 - - async def _cleanup_leaderboard_packets(self, room_id: int, packets: List[Dict]): - """清理排行榜相关数据包""" - logger.debug(f"[PacketCleaner] Cleaning {len(packets)} leaderboard packets for room {room_id}") - - # 这里可以添加排行榜清理逻辑 - - async def force_cleanup(self, room_id: int): - """强制立即清理指定房间的数据包""" - # 取消延迟清理任务 - if room_id in self.cleanup_tasks: - task = self.cleanup_tasks.pop(room_id) - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - # 立即执行清理 - await self._execute_cleanup(room_id) - logger.info(f"[PacketCleaner] Force cleaned packets for room {room_id}") - - async def cleanup_all_for_room(self, room_id: int): - """清理房间的所有数据包(房间结束时调用)""" - await self.force_cleanup(room_id) - - # 清理任务引用 - self.cleanup_tasks.pop(room_id, None) - self.cleanup_queue.pop(room_id, None) - - logger.info(f"[PacketCleaner] Completed full cleanup for room {room_id}") - - async def cleanup_expired_packets(self): - """定期清理过期的数据包""" - current_time = datetime.now(UTC) - expired_rooms = [] - - for room_id, packets in self.cleanup_queue.items(): - # 查找超过强制清理时间的数据包 - expired_packets = [ - p for p in packets - if (current_time - p['scheduled_at']).total_seconds() > self.force_cleanup_delay - ] - - if expired_packets: - expired_rooms.append(room_id) - - # 强制清理过期数据包 - for room_id in expired_rooms: - await self.force_cleanup(room_id) - - def get_cleanup_stats(self) -> Dict: - """获取清理统计信息""" - return { - 'active_cleanup_tasks': len([t for t in self.cleanup_tasks.values() if not t.done()]), - 'pending_packets': sum(len(packets) for packets in self.cleanup_queue.values()), - 'rooms_with_pending_cleanup': len(self.cleanup_queue), - } - - -# 全局实例 -packet_cleaner = MultiplayerPacketCleaner() - - -class GameSessionCleaner: - """游戏会话清理器(参考osu源码的会话管理)""" - - @staticmethod - async def cleanup_game_session(room_id: int, game_completed: bool = False): - """清理游戏会话数据(每局游戏结束后调用)""" - try: - # 安排数据包清理 - await packet_cleaner.schedule_cleanup(room_id, { - 'type': 'game_session_end', - 'completed': game_completed, - 'timestamp': datetime.now(UTC).isoformat() - }) - - logger.info(f"[GameSessionCleaner] Scheduled cleanup for game session in room {room_id} (completed: {game_completed})") - - except Exception as e: - logger.error(f"[GameSessionCleaner] Failed to cleanup game session for room {room_id}: {e}") - - @staticmethod - async def cleanup_user_session(room_id: int, user_id: int): - """清理单个用户的会话数据""" - try: - await packet_cleaner.schedule_cleanup(room_id, { - 'type': 'user_session_end', - 'user_id': user_id, - 'timestamp': datetime.now(UTC).isoformat() - }) - - logger.debug(f"[GameSessionCleaner] Scheduled cleanup for user {user_id} in room {room_id}") - - except Exception as e: - logger.error(f"[GameSessionCleaner] Failed to cleanup user session {user_id} in room {room_id}: {e}") - - @staticmethod - async def cleanup_room_fully(room_id: int): - """完全清理房间数据(房间关闭时调用)""" - try: - await packet_cleaner.cleanup_all_for_room(room_id) - logger.info(f"[GameSessionCleaner] Completed full room cleanup for {room_id}") - - except Exception as e: - logger.error(f"[GameSessionCleaner] Failed to fully cleanup room {room_id}: {e}") diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index 48d01fd..b83585e 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -34,7 +34,6 @@ from app.models.spectator_hub import ( from app.utils import unix_timestamp_to_windows from .hub import Client, Hub -from .spectator_buffer import spectator_state_manager from httpx import HTTPError from sqlalchemy.orm import joinedload @@ -204,22 +203,8 @@ class SpectatorHub(Hub[StoreClientState]): # Send all current player states to the new client # This matches the official OnConnectedAsync behavior active_states = [] - - # 首先从缓冲区获取状态 - buffer_stats = spectator_state_manager.get_buffer_stats() - if buffer_stats['active_users'] > 0: - logger.debug(f"[SpectatorHub] Found {buffer_stats['active_users']} users in buffer") - - # 获取缓冲区中的所有活跃用户 - active_users = spectator_state_manager.buffer.get_all_active_users() - for user_id in active_users: - state = spectator_state_manager.buffer.get_user_state(user_id) - if state and state.state == SpectatedUserState.Playing: - active_states.append((user_id, state)) - - # 然后从本地状态获取 for user_id, store in self.state.items(): - if store.state is not None and user_id not in [state[0] for state in active_states]: + if store.state is not None: active_states.append((user_id, store.state)) if active_states: @@ -256,41 +241,24 @@ class SpectatorHub(Hub[StoreClientState]): and room_user.user_id not in self.state ): # Create a synthetic SpectatorState for multiplayer players - # 关键修复:处理多人游戏中不同用户可能选择不同谱面的情况 + # This helps with cross-hub spectating try: - # 获取用户选择的谱面ID(如果是自由选择模式) - user_beatmap_id = getattr(room_user, 'beatmap_id', None) or server_room.queue.current_item.beatmap_id - user_ruleset_id = room_user.ruleset_id or server_room.queue.current_item.ruleset_id or 0 - user_mods = room_user.mods or [] - synthetic_state = SpectatorState( - beatmap_id=user_beatmap_id, - ruleset_id=user_ruleset_id, - mods=user_mods, + beatmap_id=server_room.queue.current_item.beatmap_id, + ruleset_id=room_user.ruleset_id or 0, # Default to osu! + mods=room_user.mods, state=SpectatedUserState.Playing, maximum_statistics={}, ) - # 同步到缓冲区管理器 - multiplayer_data = { - 'room_id': room_id, - 'beatmap_id': user_beatmap_id, - 'ruleset_id': user_ruleset_id, - 'mods': user_mods, - 'state': room_user.state, - 'maximum_statistics': {}, - 'is_multiplayer': True - } - await spectator_state_manager.sync_with_multiplayer(room_user.user_id, multiplayer_data) - await self.call_noblock( client, "UserBeganPlaying", room_user.user_id, synthetic_state, ) - logger.info( - f"[SpectatorHub] Sent synthetic multiplayer state for user {room_user.user_id} (beatmap: {user_beatmap_id}, ruleset: {user_ruleset_id})" + logger.debug( + f"[SpectatorHub] Sent synthetic multiplayer state for user {room_user.user_id}" ) except Exception as e: logger.debug( @@ -312,9 +280,6 @@ class SpectatorHub(Hub[StoreClientState]): maximum_statistics={}, ) - # 也同步结束状态到缓冲区 - await spectator_state_manager.handle_user_finished_playing(room_user.user_id, finished_state) - await self.call_noblock( client, "UserFinishedPlaying", @@ -386,15 +351,6 @@ class SpectatorHub(Hub[StoreClientState]): # # 预缓存beatmap文件以加速后续PP计算 # await self._preload_beatmap_for_pp_calculation(state.beatmap_id) - # 更新缓冲区状态 - session_data = { - 'beatmap_checksum': store.checksum, - 'score_token': score_token, - 'username': name, - 'started_at': time.time() - } - await spectator_state_manager.handle_user_began_playing(user_id, state, session_data) - await self.broadcast_group_call( self.group_id(user_id), "UserBeganPlaying", @@ -421,9 +377,6 @@ class SpectatorHub(Hub[StoreClientState]): score_info.statistics = header.statistics store.score.replay_frames.extend(frame_data.frames) - # 更新缓冲区的帧数据 - await spectator_state_manager.handle_frame_data(user_id, frame_data) - await self.broadcast_group_call( self.group_id(user_id), "UserSentFrames", user_id, frame_data ) @@ -605,9 +558,6 @@ class SpectatorHub(Hub[StoreClientState]): self.tasks.add(task) task.add_done_callback(self.tasks.discard) - # 通知缓冲区管理器用户结束游戏 - await spectator_state_manager.handle_user_finished_playing(user_id, state) - logger.info( f"[SpectatorHub] {user_id} finished playing {state.beatmap_id} " f"with {state.state}" @@ -628,54 +578,27 @@ class SpectatorHub(Hub[StoreClientState]): logger.info(f"[SpectatorHub] {user_id} started watching {target_id}") - # 使用缓冲区管理器处理观战开始,获取追赶数据 - catchup_bundle = await spectator_state_manager.handle_spectator_start_watching(user_id, target_id) - try: - # 首先尝试从缓冲区获取状态 - buffered_state = spectator_state_manager.buffer.get_user_state(target_id) - - if buffered_state and buffered_state.state == SpectatedUserState.Playing: - logger.info( - f"[SpectatorHub] Sending buffered state for {target_id} to spectator {user_id} " - f"(beatmap: {buffered_state.beatmap_id}, ruleset: {buffered_state.ruleset_id})" - ) - await self.call_noblock(client, "UserBeganPlaying", target_id, buffered_state) - - # 发送最近的帧数据以帮助同步 - recent_frames = spectator_state_manager.buffer.get_recent_frames(target_id, 10) - for frame_data in recent_frames: - try: - await self.call_noblock(client, "UserSentFrames", target_id, frame_data) - except Exception as e: - logger.debug(f"[SpectatorHub] Failed to send frame data: {e}") - - # 如果有追赶数据包,发送额外的同步信息 - if catchup_bundle: - multiplayer_data = catchup_bundle.get('multiplayer_data') - if multiplayer_data and multiplayer_data.get('is_multiplayer'): - logger.info( - f"[SpectatorHub] Sending multiplayer sync data for {target_id} " - f"(room: {multiplayer_data.get('room_id')})" - ) - else: - # 尝试从本地状态获取 - target_store = self.state.get(target_id) - if target_store and target_store.state: - # CRITICAL FIX: Only send state if user is actually playing - # Don't send state for finished/quit games - if target_store.state.state == SpectatedUserState.Playing: - logger.debug(f"[SpectatorHub] {target_id} is currently playing, sending local state") - await self.call_noblock(client, "UserBeganPlaying", target_id, target_store.state) - else: - logger.debug(f"[SpectatorHub] {target_id} state is {target_store.state.state}, not sending to watcher") + # Get target user's current state if it exists + target_store = self.state.get(target_id) + if target_store and target_store.state: + # CRITICAL FIX: Only send state if user is actually playing + # Don't send state for finished/quit games + if target_store.state.state == SpectatedUserState.Playing: + logger.debug( + f"[SpectatorHub] {target_id} is currently playing, sending state" + ) + # Send current state to the watcher immediately + await self.call_noblock( + client, + "UserBeganPlaying", + target_id, + target_store.state, + ) else: - # 检查多人游戏同步缓存 - multiplayer_data = spectator_state_manager.buffer.get_multiplayer_sync_data(target_id) - if multiplayer_data: - logger.debug(f"[SpectatorHub] Sending multiplayer sync data for {target_id}") - # 这里可以发送多人游戏的状态信息 - + logger.debug( + f"[SpectatorHub] {target_id} state is {target_store.state.state}, not sending to watcher" + ) except Exception as e: # User isn't tracked or error occurred - this is not critical logger.debug(f"[SpectatorHub] Could not get state for {target_id}: {e}") @@ -721,9 +644,6 @@ class SpectatorHub(Hub[StoreClientState]): logger.info(f"[SpectatorHub] {user_id} ended watching {target_id}") - # 使用缓冲区管理器处理观战结束 - await spectator_state_manager.handle_spectator_stop_watching(user_id, target_id) - # Remove from SignalR group self.remove_from_group(client, self.group_id(target_id)) diff --git a/app/signalr/hub/spectator_buffer.py b/app/signalr/hub/spectator_buffer.py deleted file mode 100644 index 13db6aa..0000000 --- a/app/signalr/hub/spectator_buffer.py +++ /dev/null @@ -1,339 +0,0 @@ -""" -观战Hub缓冲区管理器 -解决第一局游戏结束后观战和排行榜不同步的问题 -""" - -from __future__ import annotations - -import asyncio -from datetime import UTC, datetime, timedelta -from typing import Dict, List, Optional, Tuple, Set -from collections import defaultdict, deque -import logging - -from app.models.spectator_hub import SpectatorState, FrameDataBundle, SpectatedUserState -from app.models.multiplayer_hub import MultiplayerUserState - -logger = logging.getLogger(__name__) - - -class SpectatorBuffer: - """观战数据缓冲区,解决观战状态不同步问题""" - - def __init__(self): - # 用户ID -> 游戏状态缓存 - self.user_states: Dict[int, SpectatorState] = {} - - # 用户ID -> 帧数据缓冲区 (保留最近的帧数据) - self.frame_buffers: Dict[int, deque] = defaultdict(lambda: deque(maxlen=30)) - - # 用户ID -> 最后活跃时间 - self.last_activity: Dict[int, datetime] = {} - - # 用户ID -> 观战者列表 - self.spectators: Dict[int, Set[int]] = defaultdict(set) - - # 用户ID -> 游戏会话信息 - self.session_info: Dict[int, Dict] = {} - - # 多人游戏同步缓存 - self.multiplayer_sync_cache: Dict[int, Dict] = {} # user_id -> multiplayer_data - - # 缓冲区过期时间(分钟) - self.buffer_expire_time = 10 - - def update_user_state(self, user_id: int, state: SpectatorState, session_data: Optional[Dict] = None): - """更新用户状态到缓冲区""" - self.user_states[user_id] = state - self.last_activity[user_id] = datetime.now(UTC) - - if session_data: - self.session_info[user_id] = session_data - - logger.debug(f"[SpectatorBuffer] Updated state for user {user_id}: {state.state}") - - def add_frame_data(self, user_id: int, frame_data: FrameDataBundle): - """添加帧数据到缓冲区""" - self.frame_buffers[user_id].append({ - 'data': frame_data, - 'timestamp': datetime.now(UTC) - }) - self.last_activity[user_id] = datetime.now(UTC) - - def get_user_state(self, user_id: int) -> Optional[SpectatorState]: - """获取用户当前状态""" - return self.user_states.get(user_id) - - def get_recent_frames(self, user_id: int, count: int = 10) -> List[FrameDataBundle]: - """获取用户最近的帧数据""" - frames = self.frame_buffers.get(user_id, deque()) - recent_frames = list(frames)[-count:] if len(frames) >= count else list(frames) - return [frame['data'] for frame in recent_frames] - - def add_spectator(self, user_id: int, spectator_id: int): - """添加观战者""" - self.spectators[user_id].add(spectator_id) - logger.debug(f"[SpectatorBuffer] Added spectator {spectator_id} to user {user_id}") - - def remove_spectator(self, user_id: int, spectator_id: int): - """移除观战者""" - self.spectators[user_id].discard(spectator_id) - logger.debug(f"[SpectatorBuffer] Removed spectator {spectator_id} from user {user_id}") - - def get_spectators(self, user_id: int) -> Set[int]: - """获取用户的所有观战者""" - return self.spectators.get(user_id, set()) - - def clear_user_data(self, user_id: int): - """清理用户数据(游戏结束时调用,但保留一段时间用于观战同步)""" - # 不立即删除,而是标记为已结束,延迟清理 - if user_id in self.user_states: - current_state = self.user_states[user_id] - if current_state.state == SpectatedUserState.Playing: - # 将状态标记为已结束,但保留在缓冲区中 - current_state.state = SpectatedUserState.Passed # 或其他结束状态 - self.user_states[user_id] = current_state - logger.debug(f"[SpectatorBuffer] Marked user {user_id} as finished, keeping in buffer") - - def cleanup_expired_data(self): - """清理过期数据""" - current_time = datetime.now(UTC) - expired_users = [] - - for user_id, last_time in self.last_activity.items(): - if (current_time - last_time).total_seconds() > self.buffer_expire_time * 60: - expired_users.append(user_id) - - for user_id in expired_users: - self._force_clear_user(user_id) - logger.debug(f"[SpectatorBuffer] Cleaned expired data for user {user_id}") - - def _force_clear_user(self, user_id: int): - """强制清理用户数据""" - self.user_states.pop(user_id, None) - self.frame_buffers.pop(user_id, None) - self.last_activity.pop(user_id, None) - self.spectators.pop(user_id, None) - self.session_info.pop(user_id, None) - self.multiplayer_sync_cache.pop(user_id, None) - - def sync_multiplayer_state(self, user_id: int, multiplayer_data: Dict): - """同步多人游戏状态""" - self.multiplayer_sync_cache[user_id] = { - **multiplayer_data, - 'synced_at': datetime.now(UTC) - } - logger.debug(f"[SpectatorBuffer] Synced multiplayer state for user {user_id}") - - def get_multiplayer_sync_data(self, user_id: int) -> Optional[Dict]: - """获取多人游戏同步数据""" - return self.multiplayer_sync_cache.get(user_id) - - def has_active_spectators(self, user_id: int) -> bool: - """检查用户是否有活跃的观战者""" - return len(self.spectators.get(user_id, set())) > 0 - - def get_all_active_users(self) -> List[int]: - """获取所有活跃用户""" - current_time = datetime.now(UTC) - active_users = [] - - for user_id, last_time in self.last_activity.items(): - if (current_time - last_time).total_seconds() < 300: # 5分钟内活跃 - active_users.append(user_id) - - return active_users - - def create_catchup_bundle(self, user_id: int) -> Optional[Dict]: - """为新观战者创建追赶数据包""" - if user_id not in self.user_states: - return None - - state = self.user_states[user_id] - recent_frames = self.get_recent_frames(user_id, 20) # 获取最近20帧 - session_data = self.session_info.get(user_id, {}) - - return { - 'user_id': user_id, - 'state': state, - 'recent_frames': recent_frames, - 'session_info': session_data, - 'multiplayer_data': self.get_multiplayer_sync_data(user_id), - 'created_at': datetime.now(UTC).isoformat() - } - - def get_buffer_stats(self) -> Dict: - """获取缓冲区统计信息""" - return { - 'active_users': len(self.user_states), - 'total_spectators': sum(len(specs) for specs in self.spectators.values()), - 'buffered_frames': sum(len(frames) for frames in self.frame_buffers.values()), - 'multiplayer_synced_users': len(self.multiplayer_sync_cache) - } - - -class SpectatorStateManager: - """观战状态管理器,处理状态同步和缓冲""" - - def __init__(self): - self.buffer = SpectatorBuffer() - self.cleanup_task: Optional[asyncio.Task] = None - self.start_cleanup_task() - - def start_cleanup_task(self): - """启动定期清理任务""" - if self.cleanup_task is None or self.cleanup_task.done(): - self.cleanup_task = asyncio.create_task(self._periodic_cleanup()) - - async def _periodic_cleanup(self): - """定期清理过期数据""" - while True: - try: - await asyncio.sleep(300) # 每5分钟清理一次 - self.buffer.cleanup_expired_data() - - stats = self.buffer.get_buffer_stats() - if stats['active_users'] > 0: - logger.debug(f"[SpectatorStateManager] Buffer stats: {stats}") - - except Exception as e: - logger.error(f"[SpectatorStateManager] Error in periodic cleanup: {e}") - except asyncio.CancelledError: - logger.info("[SpectatorStateManager] Periodic cleanup task cancelled") - break - - async def handle_user_began_playing(self, user_id: int, state: SpectatorState, session_data: Optional[Dict] = None): - """处理用户开始游戏""" - self.buffer.update_user_state(user_id, state, session_data) - - # 如果有观战者,发送追赶数据 - spectators = self.buffer.get_spectators(user_id) - if spectators: - logger.debug(f"[SpectatorStateManager] User {user_id} has {len(spectators)} spectators, maintaining buffer") - - async def handle_user_finished_playing(self, user_id: int, final_state: SpectatorState): - """处理用户结束游戏""" - # 更新为结束状态,但保留在缓冲区中以便观战者同步 - self.buffer.update_user_state(user_id, final_state) - - # 如果有观战者,保持数据在缓冲区中更长时间 - if self.buffer.has_active_spectators(user_id): - logger.debug(f"[SpectatorStateManager] User {user_id} finished, keeping data for spectators") - else: - # 延迟清理 - asyncio.create_task(self._delayed_cleanup_user(user_id, 60)) # 60秒后清理 - - async def _delayed_cleanup_user(self, user_id: int, delay_seconds: int): - """延迟清理用户数据""" - await asyncio.sleep(delay_seconds) - if not self.buffer.has_active_spectators(user_id): - self.buffer.clear_user_data(user_id) - logger.debug(f"[SpectatorStateManager] Delayed cleanup for user {user_id}") - - async def handle_frame_data(self, user_id: int, frame_data: FrameDataBundle): - """处理帧数据""" - self.buffer.add_frame_data(user_id, frame_data) - - async def handle_spectator_start_watching(self, spectator_id: int, target_id: int) -> Optional[Dict]: - """处理观战者开始观看,返回追赶数据包""" - self.buffer.add_spectator(target_id, spectator_id) - - # 为新观战者创建追赶数据包 - catchup_bundle = self.buffer.create_catchup_bundle(target_id) - - if catchup_bundle: - logger.debug(f"[SpectatorStateManager] Created catchup bundle for spectator {spectator_id} watching {target_id}") - - return catchup_bundle - - async def handle_spectator_stop_watching(self, spectator_id: int, target_id: int): - """处理观战者停止观看""" - self.buffer.remove_spectator(target_id, spectator_id) - - async def sync_with_multiplayer(self, user_id: int, multiplayer_data: Dict): - """与多人游戏模式同步""" - self.buffer.sync_multiplayer_state(user_id, multiplayer_data) - - beatmap_id = multiplayer_data.get('beatmap_id') - ruleset_id = multiplayer_data.get('ruleset_id', 0) - - logger.info( - f"[SpectatorStateManager] Syncing multiplayer data for user {user_id}: " - f"beatmap={beatmap_id}, ruleset={ruleset_id}" - ) - - # 如果用户没有在SpectatorHub中但在多人游戏中,创建同步状态 - if user_id not in self.buffer.user_states: - try: - synthetic_state = SpectatorState( - beatmap_id=beatmap_id, - ruleset_id=ruleset_id, - mods=multiplayer_data.get('mods', []), - state=self._convert_multiplayer_state(multiplayer_data.get('state')), - maximum_statistics=multiplayer_data.get('maximum_statistics', {}), - ) - - await self.handle_user_began_playing(user_id, synthetic_state, { - 'source': 'multiplayer', - 'room_id': multiplayer_data.get('room_id'), - 'beatmap_id': beatmap_id, - 'ruleset_id': ruleset_id, - 'is_multiplayer': multiplayer_data.get('is_multiplayer', True), - 'synced_at': datetime.now(UTC).isoformat() - }) - - logger.info( - f"[SpectatorStateManager] Created synthetic state for multiplayer user {user_id} " - f"(beatmap: {beatmap_id}, ruleset: {ruleset_id})" - ) - - except Exception as e: - logger.error(f"[SpectatorStateManager] Failed to create synthetic state for user {user_id}: {e}") - else: - # 更新现有状态 - existing_state = self.buffer.user_states[user_id] - if existing_state.beatmap_id != beatmap_id or existing_state.ruleset_id != ruleset_id: - logger.info( - f"[SpectatorStateManager] Updating state for user {user_id}: " - f"beatmap {existing_state.beatmap_id} -> {beatmap_id}, " - f"ruleset {existing_state.ruleset_id} -> {ruleset_id}" - ) - - # 更新状态以匹配多人游戏 - existing_state.beatmap_id = beatmap_id - existing_state.ruleset_id = ruleset_id - existing_state.mods = multiplayer_data.get('mods', []) - - self.buffer.update_user_state(user_id, existing_state) - - def _convert_multiplayer_state(self, mp_state) -> SpectatedUserState: - """将多人游戏状态转换为观战状态""" - if not mp_state: - return SpectatedUserState.Playing - - # 假设mp_state是MultiplayerUserState类型 - if hasattr(mp_state, 'name'): - state_name = mp_state.name - if 'PLAYING' in state_name: - return SpectatedUserState.Playing - elif 'RESULTS' in state_name: - return SpectatedUserState.Passed - elif 'FAILED' in state_name: - return SpectatedUserState.Failed - elif 'QUIT' in state_name: - return SpectatedUserState.Quit - - return SpectatedUserState.Playing # 默认状态 - - def get_buffer_stats(self) -> Dict: - """获取缓冲区统计信息""" - return self.buffer.get_buffer_stats() - - def stop_cleanup_task(self): - """停止清理任务""" - if self.cleanup_task and not self.cleanup_task.done(): - self.cleanup_task.cancel() - - -# 全局实例 -spectator_state_manager = SpectatorStateManager() diff --git a/app/utils.py b/app/utils.py index 683b448..b4cc162 100644 --- a/app/utils.py +++ b/app/utils.py @@ -130,63 +130,6 @@ def truncate(text: str, limit: int = 100, ellipsis: str = "...") -> str: return text - -def parse_user_agent(user_agent: str | None, max_length: int = 255) -> str | None: - """ - 解析用户代理字符串,提取关键信息:设备、系统、浏览器 - - 参数: - user_agent: 用户代理字符串 - max_length: 最大长度限制 - - 返回: - 简化后的用户代理字符串 - """ - if user_agent is None: - return None - - # 检查是否是 osu! 客户端 - if "osu!" in user_agent.lower(): - return "osu!" - - # 提取关键信息 - parsed_info = [] - - # 提取设备信息 - device_matches = [ - # 常见移动设备型号 - r"(iPhone|iPad|iPod|Android|ALI-AN00|SM-\w+|MI \w+|Redmi|HUAWEI|HONOR|POCO)", - # 其他设备关键词 - r"(Windows NT|Macintosh|Linux|Ubuntu)" - ] - - import re - for pattern in device_matches: - matches = re.findall(pattern, user_agent) - if matches: - parsed_info.extend(matches) - - # 提取浏览器信息 - browser_matches = [ - r"(Chrome|Firefox|Safari|Edge|MSIE|MQQBrowser|MiuiBrowser|OPR|Opera)", - r"(WebKit|Gecko|Trident)" - ] - - for pattern in browser_matches: - matches = re.findall(pattern, user_agent) - if matches: - # 只取第一个匹配的浏览器 - parsed_info.append(matches[0]) - break - - # 组合信息 - if parsed_info: - result = " / ".join(set(parsed_info)) - return truncate(result, max_length - 3, "...") - - # 如果无法解析,则截断原始字符串 - return truncate(user_agent, max_length - 3, "...") - def check_image(content: bytes, size: int, width: int, height: int) -> None: if len(content) > size: # 10MB limit raise HTTPException(status_code=400, detail="File size exceeds 10MB limit") diff --git a/docs/multiplayer_packet_cleanup_guide.md b/docs/multiplayer_packet_cleanup_guide.md deleted file mode 100644 index b10db0c..0000000 --- a/docs/multiplayer_packet_cleanup_guide.md +++ /dev/null @@ -1,150 +0,0 @@ -# 多人游戏数据包清理系统使用指南 - -## 概述 - -基于osu-server源码实现的多人游戏数据包清理系统,确保每局游戏结束后自动清理转发包和游戏状态数据,防止内存泄漏和性能问题。 - -## 主要改进 - -### 1. GameplayStateBuffer 增强功能 - -- **`cleanup_game_session(room_id)`**: 每局游戏结束后清理会话数据 -- **`reset_user_gameplay_state(room_id, user_id)`**: 重置单个用户的游戏状态 -- **保留房间结构**: 清理数据但保持房间活跃状态 - -### 2. MultiplayerPacketCleaner 数据包清理管理器 - -- **延迟清理**: 避免游戏过程中的性能影响 -- **分类清理**: 按数据包类型(score、state、leaderboard)分类处理 -- **强制清理**: 处理过期数据包 -- **统计监控**: 提供清理统计信息 - -### 3. GameSessionCleaner 会话清理器 - -- **`cleanup_game_session(room_id, completed)`**: 游戏会话清理 -- **`cleanup_user_session(room_id, user_id)`**: 用户会话清理 -- **`cleanup_room_fully(room_id)`**: 房间完全清理 - -## 清理触发点 - -### 1. 每局游戏结束 -```python -# 在 update_room_state() 中,当所有玩家完成游戏时 -await self._cleanup_game_session(room_id, any_user_finished_playing) -``` - -### 2. 用户离开房间 -```python -# 在 make_user_leave() 中清理用户数据 -await GameSessionCleaner.cleanup_user_session(room_id, user_id) -``` - -### 3. 用户中断游戏 -```python -# 在 AbortGameplay() 中重置用户状态 -gameplay_buffer.reset_user_gameplay_state(room_id, user.user_id) -``` - -### 4. 主机中断比赛 -```python -# 在 AbortMatch() 中清理所有玩家数据 -await self._cleanup_game_session(room_id, False) # False = 游戏被中断 -``` - -### 5. 房间关闭 -```python -# 在 end_room() 中完全清理房间 -await GameSessionCleaner.cleanup_room_fully(room_id) -``` - -## 自动化机制 - -### 1. 定期清理任务 -- 每分钟检查并清理过期数据包 -- 防止内存泄漏 - -### 2. 数据包调度清理 -- 5秒延迟清理(避免影响实时性能) -- 30秒强制清理(防止数据积累) - -### 3. 状态感知清理 -- 游戏状态变化时自动触发相应清理 -- 用户状态转换时重置游戏数据 - -## 性能优化 - -### 1. 异步清理 -- 所有清理操作都是异步的 -- 不阻塞游戏主流程 - -### 2. 延迟执行 -- 延迟清理避免频繁操作 -- 批量处理提高效率 - -### 3. 分级清理 -- 用户级别清理(个人数据) -- 会话级别清理(单局游戏) -- 房间级别清理(整个房间) - -## 监控和调试 - -### 1. 清理统计 -```python -stats = packet_cleaner.get_cleanup_stats() -# 返回: active_cleanup_tasks, pending_packets, rooms_with_pending_cleanup -``` - -### 2. 日志记录 -- 详细的清理操作日志 -- 错误处理和恢复机制 - -### 3. 错误处理 -- 清理失败不影响游戏流程 -- 优雅降级处理 - -## 与osu源码的对比 - -### 相似之处 -1. **延迟清理机制**: 类似osu的清理调度 -2. **分类数据包管理**: 按类型处理不同数据包 -3. **状态感知清理**: 根据游戏状态触发清理 -4. **错误隔离**: 清理错误不影响游戏 - -### Python特定优化 -1. **异步协程**: 使用async/await提高并发性能 -2. **字典缓存**: 使用defaultdict优化数据结构 -3. **生成器**: 使用deque限制缓冲区大小 -4. **上下文管理**: 自动资源清理 - -## 最佳实践 - -### 1. 及时清理 -- 游戏结束立即触发会话清理 -- 用户离开立即清理个人数据 - -### 2. 渐进式清理 -- 先清理用户数据 -- 再清理会话数据 -- 最后清理房间数据 - -### 3. 监控资源使用 -- 定期检查清理统计 -- 监控内存和CPU使用 - -### 4. 日志分析 -- 分析清理频率和效果 -- 优化清理策略 - -## 配置选项 - -```python -# 数据包清理配置 -cleanup_delay = 5.0 # 延迟清理时间(秒) -force_cleanup_delay = 30.0 # 强制清理时间(秒) -leaderboard_broadcast_interval = 2.0 # 排行榜广播间隔(秒) - -# 缓冲区大小限制 -score_buffer_maxlen = 50 # 分数帧缓冲区最大长度 -``` - -通过这个系统,你的Python多人游戏实现现在具备了与osu源码类似的数据包清理能力,确保每局游戏结束后自动清理转发包,防止内存泄漏并保持系统性能。