From b300ce9b09e99bbf6b363d259ee8b3197d3c67d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 13:52:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E4=BA=BA=E6=B8=B8?= =?UTF-8?q?=E6=88=8F=E6=8E=92=E8=A1=8C=E6=A6=9C=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- MULTIPLAYER_IMPROVEMENTS.md | 167 ++++++ app/database/email_verification.py | 4 +- app/database/lazer_user.py | 53 +- app/service/email_verification_service.py | 13 +- app/service/online_status_maintenance.py | 19 +- app/service/realtime_online_cleanup.py | 196 ------- app/service/stats_cleanup.py | 60 +-- app/service/stats_scheduler.py | 12 +- app/signalr/hub/metadata.py | 11 - app/signalr/hub/multiplayer.py | 486 +++++++++++++++++- .../hub/spectator_multiplayer_integration.py | 251 +++++++++ app/utils.py | 57 ++ main.py | 3 - 13 files changed, 1008 insertions(+), 324 deletions(-) create mode 100644 MULTIPLAYER_IMPROVEMENTS.md create mode 100644 app/signalr/hub/spectator_multiplayer_integration.py diff --git a/MULTIPLAYER_IMPROVEMENTS.md b/MULTIPLAYER_IMPROVEMENTS.md new file mode 100644 index 0000000..aa88a6b --- /dev/null +++ b/MULTIPLAYER_IMPROVEMENTS.md @@ -0,0 +1,167 @@ +# 多人游戏观战和实时排行榜改进说明 + +## 主要改进 + +### 1. 游戏状态缓冲区 (GameplayStateBuffer) +- **实时分数缓冲**: 为每个房间的每个玩家维护最多50帧的分数数据 +- **实时排行榜**: 自动计算和维护实时排行榜数据 +- **游戏状态快照**: 为新加入的观众创建完整的游戏状态快照 +- **观战者状态缓存**: 跟踪观战者状态以优化同步 + +### 2. 观战同步管理器 (SpectatorSyncManager) +- **跨Hub通信**: 通过Redis在MultiplayerHub和SpectatorHub之间同步状态 +- **事件通知**: 游戏开始/结束、用户状态变化等事件的实时通知 +- **异步消息处理**: 订阅和处理观战相关事件 + +### 3. 增强的MultiplayerHub功能 + +#### 新增方法: +- `UpdateScore(client, score_data)`: 接收实时分数更新 +- `GetLeaderboard(client)`: 获取当前排行榜 +- `RequestSpectatorSync(client)`: 观战者请求状态同步 + +#### 改进的方法: +- `JoinRoomWithPassword`: 增强新用户加入时的状态同步 +- `ChangeState`: 添加观战状态处理和分数缓冲区管理 +- `start_gameplay`: 启动实时排行榜广播和创建游戏快照 +- `change_room_state`: 处理游戏结束时的清理工作 + +### 4. 实时排行榜系统 +- **自动广播**: 每秒更新一次实时排行榜 +- **智能启停**: 根据游戏状态自动启动/停止广播任务 +- **最终排行榜**: 游戏结束时发送最终排行榜 + +## 客户端集成示例 + +### JavaScript客户端示例 +```javascript +// 连接到MultiplayerHub +const connection = new signalR.HubConnectionBuilder() + .withUrl("/multiplayer") + .build(); + +// 监听实时排行榜更新 +connection.on("LeaderboardUpdate", (leaderboard) => { + updateLeaderboardUI(leaderboard); +}); + +// 监听游戏状态同步(观战者) +connection.on("GameplayStateSync", (snapshot) => { + syncSpectatorUI(snapshot); +}); + +// 监听最终排行榜 +connection.on("FinalLeaderboard", (finalLeaderboard) => { + showFinalResults(finalLeaderboard); +}); + +// 发送分数更新(玩家) +async function updateScore(scoreData) { + try { + await connection.invoke("UpdateScore", scoreData); + } catch (err) { + console.error("Error updating score:", err); + } +} + +// 请求观战同步(观战者) +async function requestSpectatorSync() { + try { + await connection.invoke("RequestSpectatorSync"); + } catch (err) { + console.error("Error requesting sync:", err); + } +} + +// 获取当前排行榜 +async function getCurrentLeaderboard() { + try { + return await connection.invoke("GetLeaderboard"); + } catch (err) { + console.error("Error getting leaderboard:", err); + return []; + } +} +``` + +### Python客户端示例 +```python +import signalrcore + +# 创建连接 +connection = signalrcore.HubConnectionBuilder() \ + .with_url("ws://localhost:8000/multiplayer") \ + .build() + +# 监听排行榜更新 +def on_leaderboard_update(leaderboard): + print("Leaderboard update:", leaderboard) + # 更新UI显示排行榜 + +connection.on("LeaderboardUpdate", on_leaderboard_update) + +# 监听游戏状态同步 +def on_gameplay_state_sync(snapshot): + print("Gameplay state sync:", snapshot) + # 同步观战界面 + +connection.on("GameplayStateSync", on_gameplay_state_sync) + +# 发送分数更新 +async def send_score_update(score, combo, accuracy): + await connection.send("UpdateScore", { + "score": score, + "combo": combo, + "accuracy": accuracy, + "completed": False + }) + +# 启动连接 +connection.start() +``` + +## 配置要求 + +### Redis配置 +确保Redis服务器运行并配置正确的连接参数: +```python +# 在app/dependencies/database.py中 +REDIS_CONFIG = { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + 'decode_responses': True +} +``` + +### 数据库表结构 +确保`multiplayer_event`表包含以下字段: +- `event_detail`: JSON字段,用于存储事件详细信息 + +## 性能优化建议 + +1. **缓冲区大小调整**: 根据实际需求调整分数帧缓冲区大小(默认50帧) +2. **广播频率调整**: 可以根据网络条件调整排行榜广播频率(默认1秒) +3. **内存清理**: 定期清理过期的游戏状态快照和观战者状态 +4. **连接池优化**: 配置Redis连接池以处理高并发请求 + +## 故障排除 + +### 常见问题 +1. **排行榜不更新**: 检查Redis连接和广播任务状态 +2. **观战者状态不同步**: 确认SpectatorSyncManager已正确初始化 +3. **分数数据丢失**: 检查缓冲区大小和清理逻辑 + +### 日志监控 +关键日志点: +- `[MultiplayerHub] Synced gameplay state for user X` +- `[MultiplayerHub] Broadcasted leaderboard update to room X` +- `Error updating score for user X` +- `Error in leaderboard broadcast loop` + +### 调试模式 +在开发环境中启用详细日志: +```python +import logging +logging.getLogger("app.signalr.hub.multiplayer").setLevel(logging.DEBUG) +``` diff --git a/app/database/email_verification.py b/app/database/email_verification.py index baf053b..30ad6db 100644 --- a/app/database/email_verification.py +++ b/app/database/email_verification.py @@ -23,7 +23,7 @@ class EmailVerification(SQLModel, table=True): is_used: bool = Field(default=False) # 是否已使用 used_at: datetime | None = Field(default=None) ip_address: str | None = Field(default=None) # 请求IP - user_agent: str | None = Field(default=None) # 用户代理 + user_agent: str | None = Field(default=None, max_length=255) # 用户代理 class LoginSession(SQLModel, table=True): @@ -35,7 +35,7 @@ class LoginSession(SQLModel, table=True): user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True)) session_token: str = Field(unique=True, index=True) # 会话令牌 ip_address: str = Field() # 登录IP - user_agent: str | None = Field(default=None) + user_agent: str | None = Field(default=None, max_length=255) # 用户代理(限制长度为255字符) country_code: str | None = Field(default=None) is_verified: bool = Field(default=False) # 是否已验证 created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) diff --git a/app/database/lazer_user.py b/app/database/lazer_user.py index 3bc9ec3..e215dab 100644 --- a/app/database/lazer_user.py +++ b/app/database/lazer_user.py @@ -312,23 +312,7 @@ class UserResp(UserBase): ) ).one() redis = get_redis() - # 实时验证用户在线状态 - if obj.id is not None: - metadata_key = f"metadata:online:{obj.id}" - is_online_check = await redis.exists(metadata_key) - - # 如果metadata键不存在,立即从在线集合中清理该用户 - if not is_online_check: - try: - from app.service.realtime_online_cleanup import realtime_cleanup - await realtime_cleanup.verify_user_online_status(obj.id) - except Exception as e: - from app.log import logger - logger.warning(f"Failed to verify user {obj.id} online status: {e}") - - u.is_online = bool(is_online_check) - else: - u.is_online = False + u.is_online = await redis.exists(f"metadata:online:{obj.id}") u.cover_url = ( obj.cover.get( "url", "https://assets.ppy.sh/user-profile-covers/default.jpeg" @@ -418,27 +402,26 @@ class UserResp(UserBase): for ua in await obj.awaitable_attrs.achievement ] if "rank_history" in include: - if obj.id is not None: - rank_history = await RankHistoryResp.from_db(session, obj.id, ruleset) - if len(rank_history.data) != 0: - u.rank_history = rank_history + rank_history = await RankHistoryResp.from_db(session, obj.id, ruleset) + if len(rank_history.data) != 0: + u.rank_history = rank_history - rank_top = ( - await session.exec( - select(RankTop).where( - RankTop.user_id == obj.id, RankTop.mode == ruleset - ) + rank_top = ( + await session.exec( + select(RankTop).where( + RankTop.user_id == obj.id, RankTop.mode == ruleset ) - ).first() - if rank_top: - u.rank_highest = ( - RankHighest( - rank=rank_top.rank, - updated_at=datetime.combine(rank_top.date, datetime.min.time()), - ) - if rank_top - else None + ) + ).first() + if rank_top: + u.rank_highest = ( + RankHighest( + rank=rank_top.rank, + updated_at=datetime.combine(rank_top.date, datetime.min.time()), ) + if rank_top + else None + ) u.favourite_beatmapset_count = ( await session.exec( diff --git a/app/service/email_verification_service.py b/app/service/email_verification_service.py index 5207fa9..afe34ab 100644 --- a/app/service/email_verification_service.py +++ b/app/service/email_verification_service.py @@ -218,6 +218,10 @@ This email was sent automatically, please do not reply. # 生成新的验证码 code = EmailVerificationService.generate_verification_code() + # 解析用户代理字符串 + from app.utils import parse_user_agent + parsed_user_agent = parse_user_agent(user_agent, max_length=255) + # 创建验证记录 verification = EmailVerification( user_id=user_id, @@ -225,7 +229,7 @@ This email was sent automatically, please do not reply. verification_code=code, expires_at=datetime.now(UTC) + timedelta(minutes=10), # 10分钟过期 ip_address=ip_address, - user_agent=user_agent + user_agent=parsed_user_agent # 使用解析后的用户代理 ) db.add(verification) @@ -388,13 +392,18 @@ class LoginSessionService: is_new_location: bool = False ) -> LoginSession: """创建登录会话""" + from app.utils import parse_user_agent + + # 解析用户代理字符串,提取关键信息 + parsed_user_agent = parse_user_agent(user_agent, max_length=255) + session_token = EmailVerificationService.generate_session_token() session = LoginSession( user_id=user_id, session_token=session_token, ip_address=ip_address, - user_agent=user_agent, + user_agent=parsed_user_agent, # 使用解析后的用户代理 country_code=country_code, is_new_location=is_new_location, expires_at=datetime.now(UTC) + timedelta(hours=24), # 24小时过期 diff --git a/app/service/online_status_maintenance.py b/app/service/online_status_maintenance.py index e7a1b05..d49ef10 100644 --- a/app/service/online_status_maintenance.py +++ b/app/service/online_status_maintenance.py @@ -20,6 +20,7 @@ async def maintain_playing_users_online_status(): 定期刷新正在游玩用户的metadata在线标记, 确保他们在游玩过程中显示为在线状态。 + 但不会恢复已经退出的用户的在线状态。 """ redis_sync = get_redis_message() redis_async = get_redis() @@ -31,17 +32,25 @@ async def maintain_playing_users_online_status(): if not playing_users: return - logger.debug(f"Maintaining online status for {len(playing_users)} playing users") + logger.debug(f"Checking online status for {len(playing_users)} playing users") - # 为每个游玩用户刷新metadata在线标记 + # 仅为当前有效连接的用户刷新在线状态 + updated_count = 0 for user_id in playing_users: user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) metadata_key = f"metadata:online:{user_id_str}" - # 设置或刷新metadata在线标记,过期时间为1小时 - await redis_async.set(metadata_key, "playing", ex=3600) + # 重要:首先检查用户是否已经有在线标记,只有存在才刷新 + if await redis_async.exists(metadata_key): + # 只更新已经在线的用户的状态,不恢复已退出的用户 + await redis_async.set(metadata_key, "playing", ex=3600) + updated_count += 1 + else: + # 如果用户已退出(没有在线标记),则从游玩用户中移除 + await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, user_id) + logger.debug(f"Removed user {user_id_str} from playing users as they are offline") - logger.debug(f"Updated metadata online status for {len(playing_users)} playing users") + logger.debug(f"Updated metadata online status for {updated_count} playing users") except Exception as e: logger.error(f"Error maintaining playing users online status: {e}") diff --git a/app/service/realtime_online_cleanup.py b/app/service/realtime_online_cleanup.py index 3b0e38f..e69de29 100644 --- a/app/service/realtime_online_cleanup.py +++ b/app/service/realtime_online_cleanup.py @@ -1,196 +0,0 @@ -""" -实时在线状态清理服务 - -此模块提供实时的在线状态清理功能,确保用户在断开连接后立即从在线列表中移除。 -""" -from __future__ import annotations - -import asyncio -import json -from datetime import datetime, timedelta - -from app.dependencies.database import get_redis, get_redis_message -from app.log import logger -from app.router.v2.stats import ( - REDIS_ONLINE_USERS_KEY, - REDIS_PLAYING_USERS_KEY, - _redis_exec, -) - - -class RealtimeOnlineCleanup: - """实时在线状态清理器""" - - def __init__(self): - self._running = False - self._task = None - - async def start(self): - """启动实时清理服务""" - if self._running: - return - - self._running = True - self._task = asyncio.create_task(self._realtime_cleanup_loop()) - logger.info("[RealtimeOnlineCleanup] Started realtime online cleanup service") - - async def stop(self): - """停止实时清理服务""" - if not self._running: - return - - self._running = False - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - logger.info("[RealtimeOnlineCleanup] Stopped realtime online cleanup service") - - async def _realtime_cleanup_loop(self): - """实时清理循环 - 每30秒检查一次""" - while self._running: - try: - # 执行快速清理 - cleaned_count = await self._quick_cleanup_stale_users() - if cleaned_count > 0: - logger.debug(f"[RealtimeOnlineCleanup] Quick cleanup: removed {cleaned_count} stale users") - - # 等待30秒 - await asyncio.sleep(30) - - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"[RealtimeOnlineCleanup] Error in cleanup loop: {e}") - # 出错时等待30秒再重试 - await asyncio.sleep(30) - - async def _quick_cleanup_stale_users(self) -> int: - """快速清理过期用户,返回清理数量""" - redis_sync = get_redis_message() - redis_async = get_redis() - - total_cleaned = 0 - - try: - # 获取所有在线用户 - online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY) - - # 快速检查:只检查metadata键是否存在 - stale_users = [] - for user_id in online_users: - user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) - metadata_key = f"metadata:online:{user_id_str}" - - # 如果metadata标记不存在,立即标记为过期 - if not await redis_async.exists(metadata_key): - stale_users.append(user_id_str) - - # 立即清理过期用户 - if stale_users: - # 从在线用户集合中移除 - await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, *stale_users) - # 同时从游玩用户集合中移除(如果存在) - await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, *stale_users) - total_cleaned = len(stale_users) - - logger.info(f"[RealtimeOnlineCleanup] Immediately cleaned {total_cleaned} stale users") - - except Exception as e: - logger.error(f"[RealtimeOnlineCleanup] Error in quick cleanup: {e}") - - return total_cleaned - - async def verify_user_online_status(self, user_id: int) -> bool: - """实时验证用户在线状态""" - try: - redis = get_redis() - metadata_key = f"metadata:online:{user_id}" - - # 检查metadata键是否存在 - exists = await redis.exists(metadata_key) - - # 如果不存在,从在线集合中移除该用户 - if not exists: - redis_sync = get_redis_message() - await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, str(user_id)) - await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id)) - logger.debug(f"[RealtimeOnlineCleanup] Verified user {user_id} is offline, removed from sets") - - return bool(exists) - - except Exception as e: - logger.error(f"[RealtimeOnlineCleanup] Error verifying user {user_id} status: {e}") - return False - - async def force_refresh_online_list(self): - """强制刷新整个在线用户列表""" - try: - redis_sync = get_redis_message() - redis_async = get_redis() - - # 获取所有在线用户 - online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY) - playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY) - - # 验证每个用户的在线状态 - valid_online_users = [] - valid_playing_users = [] - - for user_id in online_users: - user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) - metadata_key = f"metadata:online:{user_id_str}" - - if await redis_async.exists(metadata_key): - valid_online_users.append(user_id_str) - - for user_id in playing_users: - user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) - metadata_key = f"metadata:online:{user_id_str}" - - if await redis_async.exists(metadata_key): - valid_playing_users.append(user_id_str) - - # 重建在线用户集合 - if online_users: - await _redis_exec(redis_sync.delete, REDIS_ONLINE_USERS_KEY) - if valid_online_users: - await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, *valid_online_users) - # 设置过期时间 - await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600) - - # 重建游玩用户集合 - if playing_users: - await _redis_exec(redis_sync.delete, REDIS_PLAYING_USERS_KEY) - if valid_playing_users: - await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, *valid_playing_users) - # 设置过期时间 - await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600) - - cleaned_online = len(online_users) - len(valid_online_users) - cleaned_playing = len(playing_users) - len(valid_playing_users) - - if cleaned_online > 0 or cleaned_playing > 0: - logger.info(f"[RealtimeOnlineCleanup] Force refresh: removed {cleaned_online} stale online users, {cleaned_playing} stale playing users") - - return cleaned_online + cleaned_playing - - except Exception as e: - logger.error(f"[RealtimeOnlineCleanup] Error in force refresh: {e}") - return 0 - - -# 全局实例 -realtime_cleanup = RealtimeOnlineCleanup() - - -def start_realtime_cleanup(): - """启动实时清理服务""" - asyncio.create_task(realtime_cleanup.start()) - - -def stop_realtime_cleanup(): - """停止实时清理服务""" - asyncio.create_task(realtime_cleanup.stop()) diff --git a/app/service/stats_cleanup.py b/app/service/stats_cleanup.py index 0106e7c..a7c57d0 100644 --- a/app/service/stats_cleanup.py +++ b/app/service/stats_cleanup.py @@ -11,14 +11,13 @@ from app.router.v2.stats import ( ) -async def cleanup_stale_online_users() -> tuple[int, int, int]: - """清理过期的在线和游玩用户,返回清理的用户数(online_cleaned, playing_cleaned, metadata_cleaned)""" +async def cleanup_stale_online_users() -> tuple[int, int]: + """清理过期的在线和游玩用户,返回清理的用户数""" redis_sync = get_redis_message() redis_async = get_redis() online_cleaned = 0 playing_cleaned = 0 - metadata_cleaned = 0 try: # 获取所有在线用户 @@ -72,63 +71,10 @@ async def cleanup_stale_online_users() -> tuple[int, int, int]: playing_cleaned = len(stale_playing_users) logger.info(f"Cleaned {playing_cleaned} stale playing users") - # 新增:清理过期的metadata在线标记 - # 这个步骤用于清理那些由于异常断开连接而没有被正常清理的metadata键 - metadata_cleaned = 0 - try: - # 查找所有metadata:online:*键 - metadata_keys = [] - cursor = 0 - pattern = "metadata:online:*" - - # 使用SCAN命令遍历所有匹配的键 - while True: - cursor, keys = await redis_async.scan(cursor=cursor, match=pattern, count=100) - metadata_keys.extend(keys) - if cursor == 0: - break - - # 检查这些键是否对应有效的在线用户 - orphaned_metadata_keys = [] - for key in metadata_keys: - if isinstance(key, bytes): - key_str = key.decode() - else: - key_str = key - - # 从键名中提取用户ID - user_id = key_str.replace("metadata:online:", "") - - # 检查用户是否在在线用户集合中 - is_in_online_set = await _redis_exec(redis_sync.sismember, REDIS_ONLINE_USERS_KEY, user_id) - is_in_playing_set = await _redis_exec(redis_sync.sismember, REDIS_PLAYING_USERS_KEY, user_id) - - # 如果用户既不在在线集合也不在游玩集合中,检查TTL - if not is_in_online_set and not is_in_playing_set: - # 检查键的TTL - ttl = await redis_async.ttl(key_str) - # TTL < 0 表示键没有过期时间或已过期 - # 我们只清理那些明确过期或没有设置TTL的键 - if ttl < 0: - # 再次确认键确实存在且没有对应的活跃连接 - key_value = await redis_async.get(key_str) - if key_value: - # 键存在但用户不在任何集合中,且没有有效TTL,可以安全删除 - orphaned_metadata_keys.append(key_str) - - # 清理孤立的metadata键 - if orphaned_metadata_keys: - await redis_async.delete(*orphaned_metadata_keys) - metadata_cleaned = len(orphaned_metadata_keys) - logger.info(f"Cleaned {metadata_cleaned} orphaned metadata:online keys") - - except Exception as e: - logger.error(f"Error cleaning orphaned metadata keys: {e}") - except Exception as e: logger.error(f"Error cleaning stale users: {e}") - return online_cleaned, playing_cleaned, metadata_cleaned + return online_cleaned, playing_cleaned async def refresh_redis_key_expiry() -> None: diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py index ead52ca..cef88d1 100644 --- a/app/service/stats_scheduler.py +++ b/app/service/stats_scheduler.py @@ -134,10 +134,10 @@ class StatsScheduler: """清理循环 - 每10分钟清理一次过期用户""" # 启动时立即执行一次清理 try: - online_cleaned, playing_cleaned, metadata_cleaned = await cleanup_stale_online_users() - if online_cleaned > 0 or playing_cleaned > 0 or metadata_cleaned > 0: + online_cleaned, playing_cleaned = await cleanup_stale_online_users() + if online_cleaned > 0 or playing_cleaned > 0: logger.info( - f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users, {metadata_cleaned} orphaned metadata keys" + f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users" ) await refresh_redis_key_expiry() @@ -153,10 +153,10 @@ class StatsScheduler: try: # 清理过期用户 - online_cleaned, playing_cleaned, metadata_cleaned = await cleanup_stale_online_users() - if online_cleaned > 0 or playing_cleaned > 0 or metadata_cleaned > 0: + online_cleaned, playing_cleaned = await cleanup_stale_online_users() + if online_cleaned > 0 or playing_cleaned > 0: logger.info( - f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users, {metadata_cleaned} orphaned metadata keys" + f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users" ) # 刷新Redis key过期时间 diff --git a/app/signalr/hub/metadata.py b/app/signalr/hub/metadata.py index 8335f0d..757e9c8 100644 --- a/app/signalr/hub/metadata.py +++ b/app/signalr/hub/metadata.py @@ -202,11 +202,6 @@ class MetadataHub(Hub[MetadataClientState]): if store.status is not None and store.status == status_: return store.status = OnlineStatus(status_) - - # 刷新用户在线状态 - from app.service.online_status_manager import online_status_manager - await online_status_manager.refresh_user_online_status(user_id, f"status_{status_.name.lower()}") - tasks = self.broadcast_tasks(user_id, store) tasks.add( self.call_noblock( @@ -224,12 +219,6 @@ class MetadataHub(Hub[MetadataClientState]): user_id = int(client.connection_id) store = self.get_or_create_state(client) store.activity = activity - - # 刷新用户在线状态 - from app.service.online_status_manager import online_status_manager - activity_type = type(activity).__name__ if activity else 'active' - await online_status_manager.refresh_user_online_status(user_id, f"activity_{activity_type}") - tasks = self.broadcast_tasks(user_id, store) tasks.add( self.call_noblock( diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 2ea759a..f8d8067 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -2,7 +2,9 @@ from __future__ import annotations import asyncio from datetime import UTC, datetime, timedelta -from typing import override +from typing import override, Dict, List, Optional, Tuple +import json +from collections import defaultdict, deque from app.database import Room from app.database.beatmap import Beatmap @@ -54,6 +56,152 @@ from sqlmodel import col, exists, select GAMEPLAY_LOAD_TIMEOUT = 30 +class GameplayStateBuffer: + """游戏状态缓冲区,用于管理实时排行榜和观战数据同步""" + + def __init__(self): + # 房间ID -> 用户分数数据缓冲区 + self.score_buffers: Dict[int, Dict[int, deque]] = defaultdict(lambda: defaultdict(lambda: deque(maxlen=50))) + # 房间ID -> 实时排行榜数据 + self.leaderboards: Dict[int, List[Dict]] = defaultdict(list) + # 房间ID -> 游戏状态快照 + self.gameplay_snapshots: Dict[int, Dict] = {} + # 用户观战状态缓存 + self.spectator_states: Dict[Tuple[int, int], Dict] = {} # (room_id, user_id) -> state + + async def add_score_frame(self, room_id: int, user_id: int, frame_data: Dict): + """添加分数帧数据到缓冲区""" + self.score_buffers[room_id][user_id].append({ + **frame_data, + 'timestamp': datetime.now(UTC), + 'user_id': user_id + }) + + # 更新实时排行榜 + await self._update_leaderboard(room_id) + + async def _update_leaderboard(self, room_id: int): + """更新实时排行榜""" + leaderboard = [] + + for user_id, frames in self.score_buffers[room_id].items(): + if not frames: + continue + + latest_frame = frames[-1] + leaderboard.append({ + 'user_id': user_id, + 'score': latest_frame.get('score', 0), + 'combo': latest_frame.get('combo', 0), + 'accuracy': latest_frame.get('accuracy', 0.0), + 'completed': latest_frame.get('completed', False), + 'timestamp': latest_frame['timestamp'] + }) + + # 按分数排序 + leaderboard.sort(key=lambda x: (-x['score'], -x['accuracy'])) + self.leaderboards[room_id] = leaderboard + + def get_leaderboard(self, room_id: int) -> List[Dict]: + """获取房间实时排行榜""" + return self.leaderboards.get(room_id, []) + + async def create_gameplay_snapshot(self, room_id: int, room_data: Dict): + """创建游戏状态快照用于新加入的观众""" + snapshot = { + 'room_id': room_id, + 'state': room_data.get('state'), + 'current_item': room_data.get('current_item'), + 'users': room_data.get('users', []), + 'leaderboard': self.get_leaderboard(room_id), + 'created_at': datetime.now(UTC) + } + self.gameplay_snapshots[room_id] = snapshot + return snapshot + + def get_gameplay_snapshot(self, room_id: int) -> Optional[Dict]: + """获取游戏状态快照""" + return self.gameplay_snapshots.get(room_id) + + async def set_spectator_state(self, room_id: int, user_id: int, state_data: Dict): + """设置观战者状态""" + key = (room_id, user_id) + self.spectator_states[key] = { + **state_data, + 'last_updated': datetime.now(UTC) + } + + def get_spectator_state(self, room_id: int, user_id: int) -> Optional[Dict]: + """获取观战者状态""" + key = (room_id, user_id) + return self.spectator_states.get(key) + + async def cleanup_room(self, room_id: int): + """清理房间相关数据""" + self.score_buffers.pop(room_id, None) + self.leaderboards.pop(room_id, None) + self.gameplay_snapshots.pop(room_id, None) + + # 清理观战者状态 + keys_to_remove = [key for key in self.spectator_states.keys() if key[0] == room_id] + for key in keys_to_remove: + self.spectator_states.pop(key, None) + + +class SpectatorSyncManager: + """观战同步管理器,处理跨Hub通信""" + + def __init__(self, redis_client): + self.redis = redis_client + self.channel_prefix = "multiplayer_spectator" + + async def notify_spectator_hubs(self, room_id: int, event_type: str, data: Dict): + """通知观战Hub游戏状态变化""" + message = { + 'room_id': room_id, + 'event_type': event_type, + 'data': data, + 'timestamp': datetime.now(UTC).isoformat() + } + + channel = f"{self.channel_prefix}:room:{room_id}" + await self.redis.publish(channel, json.dumps(message)) + + async def notify_gameplay_started(self, room_id: int, game_data: Dict): + """通知游戏开始""" + await self.notify_spectator_hubs(room_id, "gameplay_started", game_data) + + async def notify_gameplay_ended(self, room_id: int, results_data: Dict): + """通知游戏结束""" + await self.notify_spectator_hubs(room_id, "gameplay_ended", results_data) + + async def notify_user_state_change(self, room_id: int, user_id: int, old_state: str, new_state: str): + """通知用户状态变化""" + await self.notify_spectator_hubs(room_id, "user_state_changed", { + 'user_id': user_id, + 'old_state': old_state, + 'new_state': new_state + }) + + async def subscribe_to_spectator_events(self, callback): + """订阅观战事件""" + pattern = f"{self.channel_prefix}:*" + pubsub = self.redis.pubsub() + await pubsub.psubscribe(pattern) + + async for message in pubsub.listen(): + if message['type'] == 'pmessage': + try: + data = json.loads(message['data']) + await callback(message['channel'], data) + except Exception as e: + logger.error(f"Error processing spectator event: {e}") + + +# 全局实例 +gameplay_buffer = GameplayStateBuffer() + + class MultiplayerEventLogger: def __init__(self): pass @@ -148,6 +296,79 @@ class MultiplayerHub(Hub[MultiplayerClientState]): super().__init__() self.rooms: dict[int, ServerMultiplayerRoom] = {} self.event_logger = MultiplayerEventLogger() + self.spectator_sync_manager: Optional[SpectatorSyncManager] = None + # 实时数据推送任务管理 + self.leaderboard_tasks: Dict[int, asyncio.Task] = {} + # 观战状态同步任务 + self.spectator_sync_tasks: Dict[int, asyncio.Task] = {} + + async def initialize_managers(self): + """初始化管理器""" + if not self.spectator_sync_manager: + redis = get_redis() + self.spectator_sync_manager = SpectatorSyncManager(redis) + + # 启动观战事件监听 + asyncio.create_task(self.spectator_sync_manager.subscribe_to_spectator_events( + self._handle_spectator_event + )) + + async def _handle_spectator_event(self, channel: str, data: Dict): + """处理观战事件""" + try: + room_id = data.get('room_id') + event_type = data.get('event_type') + event_data = data.get('data', {}) + + if room_id and event_type and room_id in self.rooms: + server_room = self.rooms[room_id] + await self._process_spectator_event(server_room, event_type, event_data) + except Exception as e: + logger.error(f"Error handling spectator event: {e}") + + async def _process_spectator_event(self, server_room: ServerMultiplayerRoom, event_type: str, event_data: Dict): + """处理具体的观战事件""" + room_id = server_room.room.room_id + + if event_type == "spectator_joined": + user_id = event_data.get('user_id') + if user_id: + await self._sync_spectator_with_current_state(room_id, user_id) + + elif event_type == "request_leaderboard": + user_id = event_data.get('user_id') + if user_id: + leaderboard = gameplay_buffer.get_leaderboard(room_id) + await self._send_leaderboard_to_spectator(user_id, leaderboard) + + async def _sync_spectator_with_current_state(self, room_id: int, user_id: int): + """同步观战者与当前游戏状态""" + try: + snapshot = gameplay_buffer.get_gameplay_snapshot(room_id) + if snapshot: + # 通过Redis发送状态同步信息给SpectatorHub + redis = get_redis() + sync_data = { + 'target_user': user_id, + 'snapshot': snapshot, + 'timestamp': datetime.now(UTC).isoformat() + } + await redis.publish(f"spectator_sync:{room_id}", json.dumps(sync_data)) + except Exception as e: + logger.error(f"Error syncing spectator {user_id} with room {room_id}: {e}") + + async def _send_leaderboard_to_spectator(self, user_id: int, leaderboard: List[Dict]): + """发送排行榜数据给观战者""" + try: + redis = get_redis() + leaderboard_data = { + 'target_user': user_id, + 'leaderboard': leaderboard, + 'timestamp': datetime.now(UTC).isoformat() + } + await redis.publish(f"leaderboard_update:{user_id}", json.dumps(leaderboard_data)) + except Exception as e: + logger.error(f"Error sending leaderboard to spectator {user_id}: {e}") @staticmethod def group_id(room: int) -> str: @@ -271,6 +492,10 @@ class MultiplayerHub(Hub[MultiplayerClientState]): async def JoinRoomWithPassword(self, client: Client, room_id: int, password: str): logger.info(f"[MultiplayerHub] {client.user_id} joining room {room_id}") + + # 初始化管理器 + await self.initialize_managers() + store = self.get_or_create_state(client) if store.room_id != 0: raise InvokeException("You are already in a room") @@ -293,9 +518,13 @@ class MultiplayerHub(Hub[MultiplayerClientState]): self.add_to_group(client, self.group_id(room_id)) await server_room.match_type_handler.handle_join(user) - # Critical fix: Send current room and gameplay state to new user + # Enhanced: Send current room and gameplay state to new user # This ensures spectators joining ongoing games get proper state sync await self._send_room_state_to_new_user(client, server_room) + + # 如果正在进行游戏,同步游戏状态 + if room.state in [MultiplayerRoomState.PLAYING, MultiplayerRoomState.WAITING_FOR_LOAD]: + await self._sync_new_user_with_gameplay(client, server_room) await self.event_logger.player_joined(room_id, user.user_id) @@ -327,6 +556,42 @@ class MultiplayerHub(Hub[MultiplayerClientState]): redis = get_redis() await redis.publish("chat:room:joined", f"{room.channel_id}:{user.user_id}") + + # 通知观战Hub有新用户加入 + if self.spectator_sync_manager: + await self.spectator_sync_manager.notify_spectator_hubs( + room_id, "user_joined", {'user_id': user.user_id} + ) + + return room + + async def _sync_new_user_with_gameplay(self, client: Client, room: ServerMultiplayerRoom): + """同步新用户与正在进行的游戏状态""" + try: + room_id = room.room.room_id + + # 获取游戏状态快照 + snapshot = gameplay_buffer.get_gameplay_snapshot(room_id) + if not snapshot: + # 创建新的快照 + room_data = { + 'state': room.room.state, + 'current_item': room.queue.current_item, + 'users': [{'user_id': u.user_id, 'state': u.state} for u in room.room.users] + } + snapshot = await gameplay_buffer.create_gameplay_snapshot(room_id, room_data) + + # 发送游戏状态到新用户 + await self.broadcast_call(client.connection_id, "GameplayStateSync", snapshot) + + # 发送实时排行榜 + leaderboard = gameplay_buffer.get_leaderboard(room_id) + if leaderboard: + await self.broadcast_call(client.connection_id, "LeaderboardUpdate", leaderboard) + + logger.info(f"[MultiplayerHub] Synced gameplay state for user {client.user_id} in room {room_id}") + except Exception as e: + logger.error(f"Error syncing new user with gameplay: {e}") return room @@ -704,14 +969,32 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if user.state == state: return + # 记录状态变化用于观战同步 + old_state = user.state + # Special handling for state changes during gameplay match state: case MultiplayerUserState.IDLE: if user.state.is_playing: + # 玩家退出游戏时,清理分数缓冲区 + room_id = room.room_id + if room_id in gameplay_buffer.score_buffers: + gameplay_buffer.score_buffers[room_id].pop(user.user_id, None) + await gameplay_buffer._update_leaderboard(room_id) + await self._broadcast_leaderboard_update(server_room) return case MultiplayerUserState.LOADED | MultiplayerUserState.READY_FOR_GAMEPLAY: if not user.state.is_playing: return + case MultiplayerUserState.PLAYING: + # 开始游戏时初始化分数缓冲区 + room_id = room.room_id + await gameplay_buffer.add_score_frame(room_id, user.user_id, { + 'score': 0, + 'combo': 0, + 'accuracy': 100.0, + 'completed': False + }) logger.info( f"[MultiplayerHub] User {user.user_id} changing state from {user.state} to {state}" @@ -729,8 +1012,63 @@ class MultiplayerHub(Hub[MultiplayerClientState]): if state == MultiplayerUserState.SPECTATING: await self.handle_spectator_state_change(client, server_room, user) + # 通知观战Hub状态变化 + if self.spectator_sync_manager: + await self.spectator_sync_manager.notify_user_state_change( + room.room_id, user.user_id, old_state.name, state.name + ) + await self.update_room_state(server_room) + async def _broadcast_leaderboard_update(self, room: ServerMultiplayerRoom): + """广播实时排行榜更新""" + try: + room_id = room.room.room_id + leaderboard = gameplay_buffer.get_leaderboard(room_id) + + if leaderboard: + await self.broadcast_group_call( + self.group_id(room_id), + "LeaderboardUpdate", + leaderboard + ) + + logger.debug(f"[MultiplayerHub] Broadcasted leaderboard update to room {room_id}") + except Exception as e: + logger.error(f"Error broadcasting leaderboard update: {e}") + + async def _start_leaderboard_broadcast_task(self, room_id: int): + """启动实时排行榜广播任务""" + if room_id in self.leaderboard_tasks: + return + + async def leaderboard_broadcast_loop(): + try: + while room_id in self.rooms and room_id in self.leaderboard_tasks: + if room_id in self.rooms: + server_room = self.rooms[room_id] + if server_room.room.state == MultiplayerRoomState.PLAYING: + await self._broadcast_leaderboard_update(server_room) + + await asyncio.sleep(1.0) # 每秒更新一次排行榜 + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in leaderboard broadcast loop for room {room_id}: {e}") + + task = asyncio.create_task(leaderboard_broadcast_loop()) + self.leaderboard_tasks[room_id] = task + + async def _stop_leaderboard_broadcast_task(self, room_id: int): + """停止实时排行榜广播任务""" + if room_id in self.leaderboard_tasks: + task = self.leaderboard_tasks.pop(room_id) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + async def change_user_state( self, room: ServerMultiplayerRoom, @@ -1017,16 +1355,44 @@ class MultiplayerHub(Hub[MultiplayerClientState]): async def change_room_state( self, room: ServerMultiplayerRoom, state: MultiplayerRoomState ): + old_state = room.room.state + room_id = room.room.room_id + logger.debug( - f"[MultiplayerHub] Room {room.room.room_id} state " - f"changed from {room.room.state} to {state}" + f"[MultiplayerHub] Room {room_id} state " + f"changed from {old_state} to {state}" ) + room.room.state = state await self.broadcast_group_call( - self.group_id(room.room.room_id), + self.group_id(room_id), "RoomStateChanged", state, ) + + # 处理状态变化的特殊逻辑 + if old_state == MultiplayerRoomState.PLAYING and state == MultiplayerRoomState.OPEN: + # 游戏结束,停止实时排行榜广播 + await self._stop_leaderboard_broadcast_task(room_id) + + # 发送最终排行榜 + leaderboard = gameplay_buffer.get_leaderboard(room_id) + if leaderboard: + await self.broadcast_group_call( + self.group_id(room_id), + "FinalLeaderboard", + leaderboard + ) + + # 通知观战Hub游戏结束 + if self.spectator_sync_manager: + await self.spectator_sync_manager.notify_gameplay_ended(room_id, { + 'leaderboard': leaderboard + }) + + elif state == MultiplayerRoomState.PLAYING: + # 游戏开始,启动实时排行榜 + await self._start_leaderboard_broadcast_task(room_id) async def StartMatch(self, client: Client): server_room = self._ensure_in_room(client) @@ -1099,6 +1465,8 @@ class MultiplayerHub(Hub[MultiplayerClientState]): await room.stop_all_countdowns(ForceGameplayStartCountdown) playing = False played_user = 0 + room_id = room.room.room_id + for user in room.room.users: client = self.get_client_by_id(str(user.user_id)) if client is None: @@ -1112,6 +1480,15 @@ class MultiplayerHub(Hub[MultiplayerClientState]): played_user += 1 await self.change_user_state(room, user, MultiplayerUserState.PLAYING) await self.call_noblock(client, "GameplayStarted") + + # 初始化玩家分数缓冲区 + await gameplay_buffer.add_score_frame(room_id, user.user_id, { + 'score': 0, + 'combo': 0, + 'accuracy': 100.0, + 'completed': False + }) + elif user.state == MultiplayerUserState.WAITING_FOR_LOAD: await self.change_user_state(room, user, MultiplayerUserState.IDLE) await self.broadcast_group_call( @@ -1119,11 +1496,28 @@ class MultiplayerHub(Hub[MultiplayerClientState]): "GameplayAborted", GameplayAbortReason.LOAD_TOOK_TOO_LONG, ) + await self.change_room_state( room, (MultiplayerRoomState.PLAYING if playing else MultiplayerRoomState.OPEN), ) + if playing: + # 创建游戏状态快照 + room_data = { + 'state': room.room.state, + 'current_item': room.queue.current_item, + 'users': [{'user_id': u.user_id, 'state': u.state} for u in room.room.users] + } + await gameplay_buffer.create_gameplay_snapshot(room_id, room_data) + + # 启动实时排行榜广播 + await self._start_leaderboard_broadcast_task(room_id) + + # 通知观战Hub游戏开始 + if self.spectator_sync_manager: + await self.spectator_sync_manager.notify_gameplay_started(room_id, room_data) + redis = get_redis() await redis.set( f"multiplayer:{room.room.room_id}:gameplay:players", @@ -1224,8 +1618,86 @@ class MultiplayerHub(Hub[MultiplayerClientState]): room.room.room_id, room.room.host.user_id, ) - del self.rooms[room.room.room_id] - logger.info(f"[MultiplayerHub] Room {room.room.room_id} ended") + + room_id = room.room.room_id + + # 清理实时数据 + await self._stop_leaderboard_broadcast_task(room_id) + await gameplay_buffer.cleanup_room(room_id) + + # 清理观战同步任务 + if room_id in self.spectator_sync_tasks: + task = self.spectator_sync_tasks.pop(room_id) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + del self.rooms[room_id] + logger.info(f"[MultiplayerHub] Room {room_id} ended") + + async def UpdateScore(self, client: Client, score_data: Dict): + """接收并处理实时分数更新""" + try: + server_room = self._ensure_in_room(client) + room = server_room.room + user = next((u for u in room.users if u.user_id == client.user_id), None) + + if user is None: + raise InvokeException("User not found in room") + + if room.state != MultiplayerRoomState.PLAYING: + return + + if user.state != MultiplayerUserState.PLAYING: + return + + room_id = room.room_id + + # 添加分数帧到缓冲区 + await gameplay_buffer.add_score_frame(room_id, client.user_id, { + 'score': score_data.get('score', 0), + 'combo': score_data.get('combo', 0), + 'accuracy': score_data.get('accuracy', 0.0), + 'completed': score_data.get('completed', False), + 'hp': score_data.get('hp', 1.0), + 'position': score_data.get('position', 0) + }) + + except Exception as e: + logger.error(f"Error updating score for user {client.user_id}: {e}") + + async def GetLeaderboard(self, client: Client) -> List[Dict]: + """获取当前房间的实时排行榜""" + try: + server_room = self._ensure_in_room(client) + room_id = server_room.room.room_id + return gameplay_buffer.get_leaderboard(room_id) + except Exception as e: + logger.error(f"Error getting leaderboard for user {client.user_id}: {e}") + return [] + + async def RequestSpectatorSync(self, client: Client): + """观战者请求同步当前游戏状态""" + try: + server_room = self._ensure_in_room(client) + room_id = server_room.room.room_id + + # 发送游戏状态快照 + snapshot = gameplay_buffer.get_gameplay_snapshot(room_id) + if snapshot: + await self.broadcast_call(client.connection_id, "GameplayStateSync", snapshot) + + # 发送当前排行榜 + leaderboard = gameplay_buffer.get_leaderboard(room_id) + if leaderboard: + await self.broadcast_call(client.connection_id, "LeaderboardUpdate", leaderboard) + + logger.info(f"[MultiplayerHub] Sent spectator sync to user {client.user_id}") + + except Exception as e: + logger.error(f"Error handling spectator sync request: {e}") async def LeaveRoom(self, client: Client): store = self.get_or_create_state(client) diff --git a/app/signalr/hub/spectator_multiplayer_integration.py b/app/signalr/hub/spectator_multiplayer_integration.py new file mode 100644 index 0000000..4bf517f --- /dev/null +++ b/app/signalr/hub/spectator_multiplayer_integration.py @@ -0,0 +1,251 @@ +""" +SpectatorHub增强补丁 +与MultiplayerHub集成以支持多人游戏观战 +""" + +from __future__ import annotations +import asyncio +import json +from datetime import datetime, UTC +from typing import Dict, Optional +from collections import defaultdict + +from app.dependencies.database import get_redis +from app.log import logger + + +class SpectatorMultiplayerIntegration: + """SpectatorHub的多人游戏集成扩展""" + + def __init__(self, spectator_hub_instance): + self.hub = spectator_hub_instance + self.redis = None + self.multiplayer_subscribers = defaultdict(set) # room_id -> set of connection_ids + self.leaderboard_cache = {} # room_id -> leaderboard data + + async def initialize(self): + """初始化多人游戏集成""" + self.redis = get_redis() + + # 启动Redis订阅任务 + asyncio.create_task(self._subscribe_to_multiplayer_events()) + asyncio.create_task(self._subscribe_to_leaderboard_updates()) + asyncio.create_task(self._subscribe_to_spectator_sync()) + + async def _subscribe_to_multiplayer_events(self): + """订阅多人游戏事件""" + try: + pubsub = self.redis.pubsub() + await pubsub.psubscribe("multiplayer_spectator:*") + + async for message in pubsub.listen(): + if message['type'] == 'pmessage': + try: + data = json.loads(message['data']) + await self._handle_multiplayer_event(message['channel'], data) + except Exception as e: + logger.error(f"Error processing multiplayer event: {e}") + except Exception as e: + logger.error(f"Error in multiplayer events subscription: {e}") + + async def _subscribe_to_leaderboard_updates(self): + """订阅排行榜更新""" + try: + pubsub = self.redis.pubsub() + await pubsub.psubscribe("leaderboard_update:*") + + async for message in pubsub.listen(): + if message['type'] == 'pmessage': + try: + data = json.loads(message['data']) + user_id = message['channel'].split(':')[-1] + await self._send_leaderboard_to_user(int(user_id), data['leaderboard']) + except Exception as e: + logger.error(f"Error processing leaderboard update: {e}") + except Exception as e: + logger.error(f"Error in leaderboard updates subscription: {e}") + + async def _subscribe_to_spectator_sync(self): + """订阅观战同步事件""" + try: + pubsub = self.redis.pubsub() + await pubsub.psubscribe("spectator_sync:*") + + async for message in pubsub.listen(): + if message['type'] == 'pmessage': + try: + data = json.loads(message['data']) + room_id = message['channel'].split(':')[-1] + await self._handle_spectator_sync(int(room_id), data) + except Exception as e: + logger.error(f"Error processing spectator sync: {e}") + except Exception as e: + logger.error(f"Error in spectator sync subscription: {e}") + + async def _handle_multiplayer_event(self, channel: str, data: Dict): + """处理多人游戏事件""" + room_id = data.get('room_id') + event_type = data.get('event_type') + event_data = data.get('data', {}) + + if not room_id or not event_type: + return + + if event_type == "gameplay_started": + await self._handle_gameplay_started(room_id, event_data) + elif event_type == "gameplay_ended": + await self._handle_gameplay_ended(room_id, event_data) + elif event_type == "user_state_changed": + await self._handle_user_state_changed(room_id, event_data) + + async def _handle_gameplay_started(self, room_id: int, game_data: Dict): + """处理游戏开始事件""" + logger.info(f"[SpectatorHub] Multiplayer game started in room {room_id}") + + # 通知所有观战该房间的用户 + if room_id in self.multiplayer_subscribers: + for connection_id in self.multiplayer_subscribers[room_id]: + await self._send_to_connection(connection_id, "MultiplayerGameStarted", game_data) + + async def _handle_gameplay_ended(self, room_id: int, results_data: Dict): + """处理游戏结束事件""" + logger.info(f"[SpectatorHub] Multiplayer game ended in room {room_id}") + + # 发送最终结果给观战者 + if room_id in self.multiplayer_subscribers: + for connection_id in self.multiplayer_subscribers[room_id]: + await self._send_to_connection(connection_id, "MultiplayerGameEnded", results_data) + + async def _handle_user_state_changed(self, room_id: int, state_data: Dict): + """处理用户状态变化""" + user_id = state_data.get('user_id') + new_state = state_data.get('new_state') + + if room_id in self.multiplayer_subscribers: + for connection_id in self.multiplayer_subscribers[room_id]: + await self._send_to_connection(connection_id, "MultiplayerUserStateChanged", { + 'user_id': user_id, + 'state': new_state + }) + + async def _handle_spectator_sync(self, room_id: int, sync_data: Dict): + """处理观战同步请求""" + target_user = sync_data.get('target_user') + snapshot = sync_data.get('snapshot') + + if target_user and snapshot: + # 找到目标用户的连接并发送快照 + client = self.hub.get_client_by_id(str(target_user)) + if client: + await self._send_to_connection(client.connection_id, "MultiplayerSnapshot", snapshot) + + async def _send_leaderboard_to_user(self, user_id: int, leaderboard: list): + """发送排行榜给指定用户""" + client = self.hub.get_client_by_id(str(user_id)) + if client: + await self._send_to_connection(client.connection_id, "MultiplayerLeaderboard", leaderboard) + + async def _send_to_connection(self, connection_id: str, method: str, data): + """发送数据到指定连接""" + try: + await self.hub.broadcast_call(connection_id, method, data) + except Exception as e: + logger.error(f"Error sending {method} to connection {connection_id}: {e}") + + async def subscribe_to_multiplayer_room(self, connection_id: str, room_id: int): + """订阅多人游戏房间的观战""" + self.multiplayer_subscribers[room_id].add(connection_id) + + # 通知MultiplayerHub有新的观战者 + await self.redis.publish(f"multiplayer_spectator:room:{room_id}", json.dumps({ + 'event_type': 'spectator_joined', + 'data': {'user_id': self._get_user_id_from_connection(connection_id)}, + 'room_id': room_id, + 'timestamp': datetime.now(UTC).isoformat() + })) + + logger.info(f"[SpectatorHub] Connection {connection_id} subscribed to multiplayer room {room_id}") + + async def unsubscribe_from_multiplayer_room(self, connection_id: str, room_id: int): + """取消订阅多人游戏房间""" + if room_id in self.multiplayer_subscribers: + self.multiplayer_subscribers[room_id].discard(connection_id) + if not self.multiplayer_subscribers[room_id]: + del self.multiplayer_subscribers[room_id] + + logger.info(f"[SpectatorHub] Connection {connection_id} unsubscribed from multiplayer room {room_id}") + + async def request_multiplayer_leaderboard(self, connection_id: str, room_id: int): + """请求多人游戏排行榜""" + user_id = self._get_user_id_from_connection(connection_id) + if user_id: + await self.redis.publish(f"multiplayer_spectator:room:{room_id}", json.dumps({ + 'event_type': 'request_leaderboard', + 'data': {'user_id': user_id}, + 'room_id': room_id, + 'timestamp': datetime.now(UTC).isoformat() + })) + + def _get_user_id_from_connection(self, connection_id: str) -> Optional[int]: + """从连接ID获取用户ID""" + # 这需要根据实际的SpectatorHub实现来调整 + for client in self.hub.clients.values(): + if client.connection_id == connection_id: + return client.user_id + return None + + +# 在SpectatorHub类中添加的方法 +async def init_multiplayer_integration(self): + """初始化多人游戏集成""" + if not hasattr(self, 'multiplayer_integration'): + self.multiplayer_integration = SpectatorMultiplayerIntegration(self) + await self.multiplayer_integration.initialize() + +async def WatchMultiplayerRoom(self, client, room_id: int): + """开始观战多人游戏房间""" + try: + if not hasattr(self, 'multiplayer_integration'): + await self.init_multiplayer_integration() + + await self.multiplayer_integration.subscribe_to_multiplayer_room( + client.connection_id, room_id + ) + + # 请求当前状态同步 + await self.multiplayer_integration.request_multiplayer_leaderboard( + client.connection_id, room_id + ) + + return {"success": True, "message": f"Now watching multiplayer room {room_id}"} + except Exception as e: + logger.error(f"Error starting multiplayer room watch: {e}") + raise InvokeException(f"Failed to watch multiplayer room: {e}") + +async def StopWatchingMultiplayerRoom(self, client, room_id: int): + """停止观战多人游戏房间""" + try: + if hasattr(self, 'multiplayer_integration'): + await self.multiplayer_integration.unsubscribe_from_multiplayer_room( + client.connection_id, room_id + ) + + return {"success": True, "message": f"Stopped watching multiplayer room {room_id}"} + except Exception as e: + logger.error(f"Error stopping multiplayer room watch: {e}") + raise InvokeException(f"Failed to stop watching multiplayer room: {e}") + +async def RequestMultiplayerLeaderboard(self, client, room_id: int): + """请求多人游戏实时排行榜""" + try: + if not hasattr(self, 'multiplayer_integration'): + await self.init_multiplayer_integration() + + await self.multiplayer_integration.request_multiplayer_leaderboard( + client.connection_id, room_id + ) + + return {"success": True, "message": "Leaderboard request sent"} + except Exception as e: + logger.error(f"Error requesting multiplayer leaderboard: {e}") + raise InvokeException(f"Failed to request leaderboard: {e}") diff --git a/app/utils.py b/app/utils.py index e8e932d..29642fd 100644 --- a/app/utils.py +++ b/app/utils.py @@ -124,3 +124,60 @@ def truncate(text: str, limit: int = 100, ellipsis: str = "...") -> str: if len(text) > limit: return text[:limit] + ellipsis return text + + +def parse_user_agent(user_agent: str | None, max_length: int = 255) -> str | None: + """ + 解析用户代理字符串,提取关键信息:设备、系统、浏览器 + + 参数: + user_agent: 用户代理字符串 + max_length: 最大长度限制 + + 返回: + 简化后的用户代理字符串 + """ + if user_agent is None: + return None + + # 检查是否是 osu! 客户端 + if "osu!" in user_agent.lower(): + return "osu!" + + # 提取关键信息 + parsed_info = [] + + # 提取设备信息 + device_matches = [ + # 常见移动设备型号 + r"(iPhone|iPad|iPod|Android|ALI-AN00|SM-\w+|MI \w+|Redmi|HUAWEI|HONOR|POCO)", + # 其他设备关键词 + r"(Windows NT|Macintosh|Linux|Ubuntu)" + ] + + import re + for pattern in device_matches: + matches = re.findall(pattern, user_agent) + if matches: + parsed_info.extend(matches) + + # 提取浏览器信息 + browser_matches = [ + r"(Chrome|Firefox|Safari|Edge|MSIE|MQQBrowser|MiuiBrowser|OPR|Opera)", + r"(WebKit|Gecko|Trident)" + ] + + for pattern in browser_matches: + matches = re.findall(pattern, user_agent) + if matches: + # 只取第一个匹配的浏览器 + parsed_info.append(matches[0]) + break + + # 组合信息 + if parsed_info: + result = " / ".join(set(parsed_info)) + return truncate(result, max_length - 3, "...") + + # 如果无法解析,则截断原始字符串 + return truncate(user_agent, max_length - 3, "...") diff --git a/main.py b/main.py index 22557f2..2be5c37 100644 --- a/main.py +++ b/main.py @@ -37,7 +37,6 @@ from app.service.recalculate import recalculate from app.service.redis_message_system import redis_message_system from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler from app.service.online_status_maintenance import schedule_online_status_maintenance -from app.service.realtime_online_cleanup import start_realtime_cleanup, stop_realtime_cleanup # 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini") @@ -87,7 +86,6 @@ async def lifespan(app: FastAPI): await start_database_cleanup_scheduler() # 启动数据库清理调度器 redis_message_system.start() # 启动 Redis 消息系统 start_stats_scheduler() # 启动统计调度器 - start_realtime_cleanup() # 启动实时在线状态清理服务 schedule_online_status_maintenance() # 启动在线状态维护任务 load_achievements() # on shutdown @@ -95,7 +93,6 @@ async def lifespan(app: FastAPI): stop_scheduler() redis_message_system.stop() # 停止 Redis 消息系统 stop_stats_scheduler() # 停止统计调度器 - stop_realtime_cleanup() # 停止实时在线状态清理服务 await stop_cache_scheduler() # 停止缓存调度器 await stop_database_cleanup_scheduler() # 停止数据库清理调度器 await download_service.stop_health_check() # 停止下载服务健康检查