修复多人游戏排行榜问题

This commit is contained in:
咕谷酱
2025-08-22 13:52:28 +08:00
parent 6136b9fed3
commit b300ce9b09
13 changed files with 1008 additions and 324 deletions

167
MULTIPLAYER_IMPROVEMENTS.md Normal file
View File

@@ -0,0 +1,167 @@
# 多人游戏观战和实时排行榜改进说明
## 主要改进
### 1. 游戏状态缓冲区 (GameplayStateBuffer)
- **实时分数缓冲**: 为每个房间的每个玩家维护最多50帧的分数数据
- **实时排行榜**: 自动计算和维护实时排行榜数据
- **游戏状态快照**: 为新加入的观众创建完整的游戏状态快照
- **观战者状态缓存**: 跟踪观战者状态以优化同步
### 2. 观战同步管理器 (SpectatorSyncManager)
- **跨Hub通信**: 通过Redis在MultiplayerHub和SpectatorHub之间同步状态
- **事件通知**: 游戏开始/结束、用户状态变化等事件的实时通知
- **异步消息处理**: 订阅和处理观战相关事件
### 3. 增强的MultiplayerHub功能
#### 新增方法:
- `UpdateScore(client, score_data)`: 接收实时分数更新
- `GetLeaderboard(client)`: 获取当前排行榜
- `RequestSpectatorSync(client)`: 观战者请求状态同步
#### 改进的方法:
- `JoinRoomWithPassword`: 增强新用户加入时的状态同步
- `ChangeState`: 添加观战状态处理和分数缓冲区管理
- `start_gameplay`: 启动实时排行榜广播和创建游戏快照
- `change_room_state`: 处理游戏结束时的清理工作
### 4. 实时排行榜系统
- **自动广播**: 每秒更新一次实时排行榜
- **智能启停**: 根据游戏状态自动启动/停止广播任务
- **最终排行榜**: 游戏结束时发送最终排行榜
## 客户端集成示例
### JavaScript客户端示例
```javascript
// 连接到MultiplayerHub
const connection = new signalR.HubConnectionBuilder()
.withUrl("/multiplayer")
.build();
// 监听实时排行榜更新
connection.on("LeaderboardUpdate", (leaderboard) => {
updateLeaderboardUI(leaderboard);
});
// 监听游戏状态同步(观战者)
connection.on("GameplayStateSync", (snapshot) => {
syncSpectatorUI(snapshot);
});
// 监听最终排行榜
connection.on("FinalLeaderboard", (finalLeaderboard) => {
showFinalResults(finalLeaderboard);
});
// 发送分数更新(玩家)
async function updateScore(scoreData) {
try {
await connection.invoke("UpdateScore", scoreData);
} catch (err) {
console.error("Error updating score:", err);
}
}
// 请求观战同步(观战者)
async function requestSpectatorSync() {
try {
await connection.invoke("RequestSpectatorSync");
} catch (err) {
console.error("Error requesting sync:", err);
}
}
// 获取当前排行榜
async function getCurrentLeaderboard() {
try {
return await connection.invoke("GetLeaderboard");
} catch (err) {
console.error("Error getting leaderboard:", err);
return [];
}
}
```
### Python客户端示例
```python
import signalrcore
# 创建连接
connection = signalrcore.HubConnectionBuilder() \
.with_url("ws://localhost:8000/multiplayer") \
.build()
# 监听排行榜更新
def on_leaderboard_update(leaderboard):
print("Leaderboard update:", leaderboard)
# 更新UI显示排行榜
connection.on("LeaderboardUpdate", on_leaderboard_update)
# 监听游戏状态同步
def on_gameplay_state_sync(snapshot):
print("Gameplay state sync:", snapshot)
# 同步观战界面
connection.on("GameplayStateSync", on_gameplay_state_sync)
# 发送分数更新
async def send_score_update(score, combo, accuracy):
await connection.send("UpdateScore", {
"score": score,
"combo": combo,
"accuracy": accuracy,
"completed": False
})
# 启动连接
connection.start()
```
## 配置要求
### Redis配置
确保Redis服务器运行并配置正确的连接参数
```python
# 在app/dependencies/database.py中
REDIS_CONFIG = {
'host': 'localhost',
'port': 6379,
'db': 0,
'decode_responses': True
}
```
### 数据库表结构
确保`multiplayer_event`表包含以下字段:
- `event_detail`: JSON字段用于存储事件详细信息
## 性能优化建议
1. **缓冲区大小调整**: 根据实际需求调整分数帧缓冲区大小默认50帧
2. **广播频率调整**: 可以根据网络条件调整排行榜广播频率默认1秒
3. **内存清理**: 定期清理过期的游戏状态快照和观战者状态
4. **连接池优化**: 配置Redis连接池以处理高并发请求
## 故障排除
### 常见问题
1. **排行榜不更新**: 检查Redis连接和广播任务状态
2. **观战者状态不同步**: 确认SpectatorSyncManager已正确初始化
3. **分数数据丢失**: 检查缓冲区大小和清理逻辑
### 日志监控
关键日志点:
- `[MultiplayerHub] Synced gameplay state for user X`
- `[MultiplayerHub] Broadcasted leaderboard update to room X`
- `Error updating score for user X`
- `Error in leaderboard broadcast loop`
### 调试模式
在开发环境中启用详细日志:
```python
import logging
logging.getLogger("app.signalr.hub.multiplayer").setLevel(logging.DEBUG)
```

View File

@@ -23,7 +23,7 @@ class EmailVerification(SQLModel, table=True):
is_used: bool = Field(default=False) # 是否已使用 is_used: bool = Field(default=False) # 是否已使用
used_at: datetime | None = Field(default=None) used_at: datetime | None = Field(default=None)
ip_address: str | None = Field(default=None) # 请求IP ip_address: str | None = Field(default=None) # 请求IP
user_agent: str | None = Field(default=None) # 用户代理 user_agent: str | None = Field(default=None, max_length=255) # 用户代理
class LoginSession(SQLModel, table=True): class LoginSession(SQLModel, table=True):
@@ -35,7 +35,7 @@ class LoginSession(SQLModel, table=True):
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True)) user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True))
session_token: str = Field(unique=True, index=True) # 会话令牌 session_token: str = Field(unique=True, index=True) # 会话令牌
ip_address: str = Field() # 登录IP ip_address: str = Field() # 登录IP
user_agent: str | None = Field(default=None) user_agent: str | None = Field(default=None, max_length=255) # 用户代理限制长度为255字符
country_code: str | None = Field(default=None) country_code: str | None = Field(default=None)
is_verified: bool = Field(default=False) # 是否已验证 is_verified: bool = Field(default=False) # 是否已验证
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))

View File

@@ -312,23 +312,7 @@ class UserResp(UserBase):
) )
).one() ).one()
redis = get_redis() redis = get_redis()
# 实时验证用户在线状态 u.is_online = await redis.exists(f"metadata:online:{obj.id}")
if obj.id is not None:
metadata_key = f"metadata:online:{obj.id}"
is_online_check = await redis.exists(metadata_key)
# 如果metadata键不存在立即从在线集合中清理该用户
if not is_online_check:
try:
from app.service.realtime_online_cleanup import realtime_cleanup
await realtime_cleanup.verify_user_online_status(obj.id)
except Exception as e:
from app.log import logger
logger.warning(f"Failed to verify user {obj.id} online status: {e}")
u.is_online = bool(is_online_check)
else:
u.is_online = False
u.cover_url = ( u.cover_url = (
obj.cover.get( obj.cover.get(
"url", "https://assets.ppy.sh/user-profile-covers/default.jpeg" "url", "https://assets.ppy.sh/user-profile-covers/default.jpeg"
@@ -418,27 +402,26 @@ class UserResp(UserBase):
for ua in await obj.awaitable_attrs.achievement for ua in await obj.awaitable_attrs.achievement
] ]
if "rank_history" in include: if "rank_history" in include:
if obj.id is not None: rank_history = await RankHistoryResp.from_db(session, obj.id, ruleset)
rank_history = await RankHistoryResp.from_db(session, obj.id, ruleset) if len(rank_history.data) != 0:
if len(rank_history.data) != 0: u.rank_history = rank_history
u.rank_history = rank_history
rank_top = ( rank_top = (
await session.exec( await session.exec(
select(RankTop).where( select(RankTop).where(
RankTop.user_id == obj.id, RankTop.mode == ruleset RankTop.user_id == obj.id, RankTop.mode == ruleset
)
) )
).first() )
if rank_top: ).first()
u.rank_highest = ( if rank_top:
RankHighest( u.rank_highest = (
rank=rank_top.rank, RankHighest(
updated_at=datetime.combine(rank_top.date, datetime.min.time()), rank=rank_top.rank,
) updated_at=datetime.combine(rank_top.date, datetime.min.time()),
if rank_top
else None
) )
if rank_top
else None
)
u.favourite_beatmapset_count = ( u.favourite_beatmapset_count = (
await session.exec( await session.exec(

View File

@@ -218,6 +218,10 @@ This email was sent automatically, please do not reply.
# 生成新的验证码 # 生成新的验证码
code = EmailVerificationService.generate_verification_code() code = EmailVerificationService.generate_verification_code()
# 解析用户代理字符串
from app.utils import parse_user_agent
parsed_user_agent = parse_user_agent(user_agent, max_length=255)
# 创建验证记录 # 创建验证记录
verification = EmailVerification( verification = EmailVerification(
user_id=user_id, user_id=user_id,
@@ -225,7 +229,7 @@ This email was sent automatically, please do not reply.
verification_code=code, verification_code=code,
expires_at=datetime.now(UTC) + timedelta(minutes=10), # 10分钟过期 expires_at=datetime.now(UTC) + timedelta(minutes=10), # 10分钟过期
ip_address=ip_address, ip_address=ip_address,
user_agent=user_agent user_agent=parsed_user_agent # 使用解析后的用户代理
) )
db.add(verification) db.add(verification)
@@ -388,13 +392,18 @@ class LoginSessionService:
is_new_location: bool = False is_new_location: bool = False
) -> LoginSession: ) -> LoginSession:
"""创建登录会话""" """创建登录会话"""
from app.utils import parse_user_agent
# 解析用户代理字符串,提取关键信息
parsed_user_agent = parse_user_agent(user_agent, max_length=255)
session_token = EmailVerificationService.generate_session_token() session_token = EmailVerificationService.generate_session_token()
session = LoginSession( session = LoginSession(
user_id=user_id, user_id=user_id,
session_token=session_token, session_token=session_token,
ip_address=ip_address, ip_address=ip_address,
user_agent=user_agent, user_agent=parsed_user_agent, # 使用解析后的用户代理
country_code=country_code, country_code=country_code,
is_new_location=is_new_location, is_new_location=is_new_location,
expires_at=datetime.now(UTC) + timedelta(hours=24), # 24小时过期 expires_at=datetime.now(UTC) + timedelta(hours=24), # 24小时过期

View File

@@ -20,6 +20,7 @@ async def maintain_playing_users_online_status():
定期刷新正在游玩用户的metadata在线标记 定期刷新正在游玩用户的metadata在线标记
确保他们在游玩过程中显示为在线状态。 确保他们在游玩过程中显示为在线状态。
但不会恢复已经退出的用户的在线状态。
""" """
redis_sync = get_redis_message() redis_sync = get_redis_message()
redis_async = get_redis() redis_async = get_redis()
@@ -31,17 +32,25 @@ async def maintain_playing_users_online_status():
if not playing_users: if not playing_users:
return return
logger.debug(f"Maintaining online status for {len(playing_users)} playing users") logger.debug(f"Checking online status for {len(playing_users)} playing users")
# 为每个游玩用户刷新metadata在线标记 # 仅为当前有效连接的用户刷新在线状态
updated_count = 0
for user_id in playing_users: for user_id in playing_users:
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
metadata_key = f"metadata:online:{user_id_str}" metadata_key = f"metadata:online:{user_id_str}"
# 设置或刷新metadata在线标记过期时间为1小时 # 重要:首先检查用户是否已经有在线标记,只有存在才刷新
await redis_async.set(metadata_key, "playing", ex=3600) if await redis_async.exists(metadata_key):
# 只更新已经在线的用户的状态,不恢复已退出的用户
await redis_async.set(metadata_key, "playing", ex=3600)
updated_count += 1
else:
# 如果用户已退出(没有在线标记),则从游玩用户中移除
await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, user_id)
logger.debug(f"Removed user {user_id_str} from playing users as they are offline")
logger.debug(f"Updated metadata online status for {len(playing_users)} playing users") logger.debug(f"Updated metadata online status for {updated_count} playing users")
except Exception as e: except Exception as e:
logger.error(f"Error maintaining playing users online status: {e}") logger.error(f"Error maintaining playing users online status: {e}")

View File

@@ -1,196 +0,0 @@
"""
实时在线状态清理服务
此模块提供实时的在线状态清理功能,确保用户在断开连接后立即从在线列表中移除。
"""
from __future__ import annotations
import asyncio
import json
from datetime import datetime, timedelta
from app.dependencies.database import get_redis, get_redis_message
from app.log import logger
from app.router.v2.stats import (
REDIS_ONLINE_USERS_KEY,
REDIS_PLAYING_USERS_KEY,
_redis_exec,
)
class RealtimeOnlineCleanup:
"""实时在线状态清理器"""
def __init__(self):
self._running = False
self._task = None
async def start(self):
"""启动实时清理服务"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._realtime_cleanup_loop())
logger.info("[RealtimeOnlineCleanup] Started realtime online cleanup service")
async def stop(self):
"""停止实时清理服务"""
if not self._running:
return
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("[RealtimeOnlineCleanup] Stopped realtime online cleanup service")
async def _realtime_cleanup_loop(self):
"""实时清理循环 - 每30秒检查一次"""
while self._running:
try:
# 执行快速清理
cleaned_count = await self._quick_cleanup_stale_users()
if cleaned_count > 0:
logger.debug(f"[RealtimeOnlineCleanup] Quick cleanup: removed {cleaned_count} stale users")
# 等待30秒
await asyncio.sleep(30)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[RealtimeOnlineCleanup] Error in cleanup loop: {e}")
# 出错时等待30秒再重试
await asyncio.sleep(30)
async def _quick_cleanup_stale_users(self) -> int:
"""快速清理过期用户,返回清理数量"""
redis_sync = get_redis_message()
redis_async = get_redis()
total_cleaned = 0
try:
# 获取所有在线用户
online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY)
# 快速检查只检查metadata键是否存在
stale_users = []
for user_id in online_users:
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
metadata_key = f"metadata:online:{user_id_str}"
# 如果metadata标记不存在立即标记为过期
if not await redis_async.exists(metadata_key):
stale_users.append(user_id_str)
# 立即清理过期用户
if stale_users:
# 从在线用户集合中移除
await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, *stale_users)
# 同时从游玩用户集合中移除(如果存在)
await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, *stale_users)
total_cleaned = len(stale_users)
logger.info(f"[RealtimeOnlineCleanup] Immediately cleaned {total_cleaned} stale users")
except Exception as e:
logger.error(f"[RealtimeOnlineCleanup] Error in quick cleanup: {e}")
return total_cleaned
async def verify_user_online_status(self, user_id: int) -> bool:
"""实时验证用户在线状态"""
try:
redis = get_redis()
metadata_key = f"metadata:online:{user_id}"
# 检查metadata键是否存在
exists = await redis.exists(metadata_key)
# 如果不存在,从在线集合中移除该用户
if not exists:
redis_sync = get_redis_message()
await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, str(user_id))
await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id))
logger.debug(f"[RealtimeOnlineCleanup] Verified user {user_id} is offline, removed from sets")
return bool(exists)
except Exception as e:
logger.error(f"[RealtimeOnlineCleanup] Error verifying user {user_id} status: {e}")
return False
async def force_refresh_online_list(self):
"""强制刷新整个在线用户列表"""
try:
redis_sync = get_redis_message()
redis_async = get_redis()
# 获取所有在线用户
online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY)
playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY)
# 验证每个用户的在线状态
valid_online_users = []
valid_playing_users = []
for user_id in online_users:
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
metadata_key = f"metadata:online:{user_id_str}"
if await redis_async.exists(metadata_key):
valid_online_users.append(user_id_str)
for user_id in playing_users:
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
metadata_key = f"metadata:online:{user_id_str}"
if await redis_async.exists(metadata_key):
valid_playing_users.append(user_id_str)
# 重建在线用户集合
if online_users:
await _redis_exec(redis_sync.delete, REDIS_ONLINE_USERS_KEY)
if valid_online_users:
await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, *valid_online_users)
# 设置过期时间
await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600)
# 重建游玩用户集合
if playing_users:
await _redis_exec(redis_sync.delete, REDIS_PLAYING_USERS_KEY)
if valid_playing_users:
await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, *valid_playing_users)
# 设置过期时间
await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600)
cleaned_online = len(online_users) - len(valid_online_users)
cleaned_playing = len(playing_users) - len(valid_playing_users)
if cleaned_online > 0 or cleaned_playing > 0:
logger.info(f"[RealtimeOnlineCleanup] Force refresh: removed {cleaned_online} stale online users, {cleaned_playing} stale playing users")
return cleaned_online + cleaned_playing
except Exception as e:
logger.error(f"[RealtimeOnlineCleanup] Error in force refresh: {e}")
return 0
# 全局实例
realtime_cleanup = RealtimeOnlineCleanup()
def start_realtime_cleanup():
"""启动实时清理服务"""
asyncio.create_task(realtime_cleanup.start())
def stop_realtime_cleanup():
"""停止实时清理服务"""
asyncio.create_task(realtime_cleanup.stop())

View File

@@ -11,14 +11,13 @@ from app.router.v2.stats import (
) )
async def cleanup_stale_online_users() -> tuple[int, int, int]: async def cleanup_stale_online_users() -> tuple[int, int]:
"""清理过期的在线和游玩用户,返回清理的用户数(online_cleaned, playing_cleaned, metadata_cleaned)""" """清理过期的在线和游玩用户,返回清理的用户数"""
redis_sync = get_redis_message() redis_sync = get_redis_message()
redis_async = get_redis() redis_async = get_redis()
online_cleaned = 0 online_cleaned = 0
playing_cleaned = 0 playing_cleaned = 0
metadata_cleaned = 0
try: try:
# 获取所有在线用户 # 获取所有在线用户
@@ -72,63 +71,10 @@ async def cleanup_stale_online_users() -> tuple[int, int, int]:
playing_cleaned = len(stale_playing_users) playing_cleaned = len(stale_playing_users)
logger.info(f"Cleaned {playing_cleaned} stale playing users") logger.info(f"Cleaned {playing_cleaned} stale playing users")
# 新增清理过期的metadata在线标记
# 这个步骤用于清理那些由于异常断开连接而没有被正常清理的metadata键
metadata_cleaned = 0
try:
# 查找所有metadata:online:*键
metadata_keys = []
cursor = 0
pattern = "metadata:online:*"
# 使用SCAN命令遍历所有匹配的键
while True:
cursor, keys = await redis_async.scan(cursor=cursor, match=pattern, count=100)
metadata_keys.extend(keys)
if cursor == 0:
break
# 检查这些键是否对应有效的在线用户
orphaned_metadata_keys = []
for key in metadata_keys:
if isinstance(key, bytes):
key_str = key.decode()
else:
key_str = key
# 从键名中提取用户ID
user_id = key_str.replace("metadata:online:", "")
# 检查用户是否在在线用户集合中
is_in_online_set = await _redis_exec(redis_sync.sismember, REDIS_ONLINE_USERS_KEY, user_id)
is_in_playing_set = await _redis_exec(redis_sync.sismember, REDIS_PLAYING_USERS_KEY, user_id)
# 如果用户既不在在线集合也不在游玩集合中检查TTL
if not is_in_online_set and not is_in_playing_set:
# 检查键的TTL
ttl = await redis_async.ttl(key_str)
# TTL < 0 表示键没有过期时间或已过期
# 我们只清理那些明确过期或没有设置TTL的键
if ttl < 0:
# 再次确认键确实存在且没有对应的活跃连接
key_value = await redis_async.get(key_str)
if key_value:
# 键存在但用户不在任何集合中且没有有效TTL可以安全删除
orphaned_metadata_keys.append(key_str)
# 清理孤立的metadata键
if orphaned_metadata_keys:
await redis_async.delete(*orphaned_metadata_keys)
metadata_cleaned = len(orphaned_metadata_keys)
logger.info(f"Cleaned {metadata_cleaned} orphaned metadata:online keys")
except Exception as e:
logger.error(f"Error cleaning orphaned metadata keys: {e}")
except Exception as e: except Exception as e:
logger.error(f"Error cleaning stale users: {e}") logger.error(f"Error cleaning stale users: {e}")
return online_cleaned, playing_cleaned, metadata_cleaned return online_cleaned, playing_cleaned
async def refresh_redis_key_expiry() -> None: async def refresh_redis_key_expiry() -> None:

View File

@@ -134,10 +134,10 @@ class StatsScheduler:
"""清理循环 - 每10分钟清理一次过期用户""" """清理循环 - 每10分钟清理一次过期用户"""
# 启动时立即执行一次清理 # 启动时立即执行一次清理
try: try:
online_cleaned, playing_cleaned, metadata_cleaned = await cleanup_stale_online_users() online_cleaned, playing_cleaned = await cleanup_stale_online_users()
if online_cleaned > 0 or playing_cleaned > 0 or metadata_cleaned > 0: if online_cleaned > 0 or playing_cleaned > 0:
logger.info( logger.info(
f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users, {metadata_cleaned} orphaned metadata keys" f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users"
) )
await refresh_redis_key_expiry() await refresh_redis_key_expiry()
@@ -153,10 +153,10 @@ class StatsScheduler:
try: try:
# 清理过期用户 # 清理过期用户
online_cleaned, playing_cleaned, metadata_cleaned = await cleanup_stale_online_users() online_cleaned, playing_cleaned = await cleanup_stale_online_users()
if online_cleaned > 0 or playing_cleaned > 0 or metadata_cleaned > 0: if online_cleaned > 0 or playing_cleaned > 0:
logger.info( logger.info(
f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users, {metadata_cleaned} orphaned metadata keys" f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users"
) )
# 刷新Redis key过期时间 # 刷新Redis key过期时间

View File

@@ -202,11 +202,6 @@ class MetadataHub(Hub[MetadataClientState]):
if store.status is not None and store.status == status_: if store.status is not None and store.status == status_:
return return
store.status = OnlineStatus(status_) store.status = OnlineStatus(status_)
# 刷新用户在线状态
from app.service.online_status_manager import online_status_manager
await online_status_manager.refresh_user_online_status(user_id, f"status_{status_.name.lower()}")
tasks = self.broadcast_tasks(user_id, store) tasks = self.broadcast_tasks(user_id, store)
tasks.add( tasks.add(
self.call_noblock( self.call_noblock(
@@ -224,12 +219,6 @@ class MetadataHub(Hub[MetadataClientState]):
user_id = int(client.connection_id) user_id = int(client.connection_id)
store = self.get_or_create_state(client) store = self.get_or_create_state(client)
store.activity = activity store.activity = activity
# 刷新用户在线状态
from app.service.online_status_manager import online_status_manager
activity_type = type(activity).__name__ if activity else 'active'
await online_status_manager.refresh_user_online_status(user_id, f"activity_{activity_type}")
tasks = self.broadcast_tasks(user_id, store) tasks = self.broadcast_tasks(user_id, store)
tasks.add( tasks.add(
self.call_noblock( self.call_noblock(

View File

@@ -2,7 +2,9 @@ from __future__ import annotations
import asyncio import asyncio
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
from typing import override from typing import override, Dict, List, Optional, Tuple
import json
from collections import defaultdict, deque
from app.database import Room from app.database import Room
from app.database.beatmap import Beatmap from app.database.beatmap import Beatmap
@@ -54,6 +56,152 @@ from sqlmodel import col, exists, select
GAMEPLAY_LOAD_TIMEOUT = 30 GAMEPLAY_LOAD_TIMEOUT = 30
class GameplayStateBuffer:
"""游戏状态缓冲区,用于管理实时排行榜和观战数据同步"""
def __init__(self):
# 房间ID -> 用户分数数据缓冲区
self.score_buffers: Dict[int, Dict[int, deque]] = defaultdict(lambda: defaultdict(lambda: deque(maxlen=50)))
# 房间ID -> 实时排行榜数据
self.leaderboards: Dict[int, List[Dict]] = defaultdict(list)
# 房间ID -> 游戏状态快照
self.gameplay_snapshots: Dict[int, Dict] = {}
# 用户观战状态缓存
self.spectator_states: Dict[Tuple[int, int], Dict] = {} # (room_id, user_id) -> state
async def add_score_frame(self, room_id: int, user_id: int, frame_data: Dict):
"""添加分数帧数据到缓冲区"""
self.score_buffers[room_id][user_id].append({
**frame_data,
'timestamp': datetime.now(UTC),
'user_id': user_id
})
# 更新实时排行榜
await self._update_leaderboard(room_id)
async def _update_leaderboard(self, room_id: int):
"""更新实时排行榜"""
leaderboard = []
for user_id, frames in self.score_buffers[room_id].items():
if not frames:
continue
latest_frame = frames[-1]
leaderboard.append({
'user_id': user_id,
'score': latest_frame.get('score', 0),
'combo': latest_frame.get('combo', 0),
'accuracy': latest_frame.get('accuracy', 0.0),
'completed': latest_frame.get('completed', False),
'timestamp': latest_frame['timestamp']
})
# 按分数排序
leaderboard.sort(key=lambda x: (-x['score'], -x['accuracy']))
self.leaderboards[room_id] = leaderboard
def get_leaderboard(self, room_id: int) -> List[Dict]:
"""获取房间实时排行榜"""
return self.leaderboards.get(room_id, [])
async def create_gameplay_snapshot(self, room_id: int, room_data: Dict):
"""创建游戏状态快照用于新加入的观众"""
snapshot = {
'room_id': room_id,
'state': room_data.get('state'),
'current_item': room_data.get('current_item'),
'users': room_data.get('users', []),
'leaderboard': self.get_leaderboard(room_id),
'created_at': datetime.now(UTC)
}
self.gameplay_snapshots[room_id] = snapshot
return snapshot
def get_gameplay_snapshot(self, room_id: int) -> Optional[Dict]:
"""获取游戏状态快照"""
return self.gameplay_snapshots.get(room_id)
async def set_spectator_state(self, room_id: int, user_id: int, state_data: Dict):
"""设置观战者状态"""
key = (room_id, user_id)
self.spectator_states[key] = {
**state_data,
'last_updated': datetime.now(UTC)
}
def get_spectator_state(self, room_id: int, user_id: int) -> Optional[Dict]:
"""获取观战者状态"""
key = (room_id, user_id)
return self.spectator_states.get(key)
async def cleanup_room(self, room_id: int):
"""清理房间相关数据"""
self.score_buffers.pop(room_id, None)
self.leaderboards.pop(room_id, None)
self.gameplay_snapshots.pop(room_id, None)
# 清理观战者状态
keys_to_remove = [key for key in self.spectator_states.keys() if key[0] == room_id]
for key in keys_to_remove:
self.spectator_states.pop(key, None)
class SpectatorSyncManager:
"""观战同步管理器处理跨Hub通信"""
def __init__(self, redis_client):
self.redis = redis_client
self.channel_prefix = "multiplayer_spectator"
async def notify_spectator_hubs(self, room_id: int, event_type: str, data: Dict):
"""通知观战Hub游戏状态变化"""
message = {
'room_id': room_id,
'event_type': event_type,
'data': data,
'timestamp': datetime.now(UTC).isoformat()
}
channel = f"{self.channel_prefix}:room:{room_id}"
await self.redis.publish(channel, json.dumps(message))
async def notify_gameplay_started(self, room_id: int, game_data: Dict):
"""通知游戏开始"""
await self.notify_spectator_hubs(room_id, "gameplay_started", game_data)
async def notify_gameplay_ended(self, room_id: int, results_data: Dict):
"""通知游戏结束"""
await self.notify_spectator_hubs(room_id, "gameplay_ended", results_data)
async def notify_user_state_change(self, room_id: int, user_id: int, old_state: str, new_state: str):
"""通知用户状态变化"""
await self.notify_spectator_hubs(room_id, "user_state_changed", {
'user_id': user_id,
'old_state': old_state,
'new_state': new_state
})
async def subscribe_to_spectator_events(self, callback):
"""订阅观战事件"""
pattern = f"{self.channel_prefix}:*"
pubsub = self.redis.pubsub()
await pubsub.psubscribe(pattern)
async for message in pubsub.listen():
if message['type'] == 'pmessage':
try:
data = json.loads(message['data'])
await callback(message['channel'], data)
except Exception as e:
logger.error(f"Error processing spectator event: {e}")
# 全局实例
gameplay_buffer = GameplayStateBuffer()
class MultiplayerEventLogger: class MultiplayerEventLogger:
def __init__(self): def __init__(self):
pass pass
@@ -148,6 +296,79 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
super().__init__() super().__init__()
self.rooms: dict[int, ServerMultiplayerRoom] = {} self.rooms: dict[int, ServerMultiplayerRoom] = {}
self.event_logger = MultiplayerEventLogger() self.event_logger = MultiplayerEventLogger()
self.spectator_sync_manager: Optional[SpectatorSyncManager] = None
# 实时数据推送任务管理
self.leaderboard_tasks: Dict[int, asyncio.Task] = {}
# 观战状态同步任务
self.spectator_sync_tasks: Dict[int, asyncio.Task] = {}
async def initialize_managers(self):
"""初始化管理器"""
if not self.spectator_sync_manager:
redis = get_redis()
self.spectator_sync_manager = SpectatorSyncManager(redis)
# 启动观战事件监听
asyncio.create_task(self.spectator_sync_manager.subscribe_to_spectator_events(
self._handle_spectator_event
))
async def _handle_spectator_event(self, channel: str, data: Dict):
"""处理观战事件"""
try:
room_id = data.get('room_id')
event_type = data.get('event_type')
event_data = data.get('data', {})
if room_id and event_type and room_id in self.rooms:
server_room = self.rooms[room_id]
await self._process_spectator_event(server_room, event_type, event_data)
except Exception as e:
logger.error(f"Error handling spectator event: {e}")
async def _process_spectator_event(self, server_room: ServerMultiplayerRoom, event_type: str, event_data: Dict):
"""处理具体的观战事件"""
room_id = server_room.room.room_id
if event_type == "spectator_joined":
user_id = event_data.get('user_id')
if user_id:
await self._sync_spectator_with_current_state(room_id, user_id)
elif event_type == "request_leaderboard":
user_id = event_data.get('user_id')
if user_id:
leaderboard = gameplay_buffer.get_leaderboard(room_id)
await self._send_leaderboard_to_spectator(user_id, leaderboard)
async def _sync_spectator_with_current_state(self, room_id: int, user_id: int):
"""同步观战者与当前游戏状态"""
try:
snapshot = gameplay_buffer.get_gameplay_snapshot(room_id)
if snapshot:
# 通过Redis发送状态同步信息给SpectatorHub
redis = get_redis()
sync_data = {
'target_user': user_id,
'snapshot': snapshot,
'timestamp': datetime.now(UTC).isoformat()
}
await redis.publish(f"spectator_sync:{room_id}", json.dumps(sync_data))
except Exception as e:
logger.error(f"Error syncing spectator {user_id} with room {room_id}: {e}")
async def _send_leaderboard_to_spectator(self, user_id: int, leaderboard: List[Dict]):
"""发送排行榜数据给观战者"""
try:
redis = get_redis()
leaderboard_data = {
'target_user': user_id,
'leaderboard': leaderboard,
'timestamp': datetime.now(UTC).isoformat()
}
await redis.publish(f"leaderboard_update:{user_id}", json.dumps(leaderboard_data))
except Exception as e:
logger.error(f"Error sending leaderboard to spectator {user_id}: {e}")
@staticmethod @staticmethod
def group_id(room: int) -> str: def group_id(room: int) -> str:
@@ -271,6 +492,10 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
async def JoinRoomWithPassword(self, client: Client, room_id: int, password: str): async def JoinRoomWithPassword(self, client: Client, room_id: int, password: str):
logger.info(f"[MultiplayerHub] {client.user_id} joining room {room_id}") logger.info(f"[MultiplayerHub] {client.user_id} joining room {room_id}")
# 初始化管理器
await self.initialize_managers()
store = self.get_or_create_state(client) store = self.get_or_create_state(client)
if store.room_id != 0: if store.room_id != 0:
raise InvokeException("You are already in a room") raise InvokeException("You are already in a room")
@@ -293,9 +518,13 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
self.add_to_group(client, self.group_id(room_id)) self.add_to_group(client, self.group_id(room_id))
await server_room.match_type_handler.handle_join(user) await server_room.match_type_handler.handle_join(user)
# Critical fix: Send current room and gameplay state to new user # Enhanced: Send current room and gameplay state to new user
# This ensures spectators joining ongoing games get proper state sync # This ensures spectators joining ongoing games get proper state sync
await self._send_room_state_to_new_user(client, server_room) await self._send_room_state_to_new_user(client, server_room)
# 如果正在进行游戏,同步游戏状态
if room.state in [MultiplayerRoomState.PLAYING, MultiplayerRoomState.WAITING_FOR_LOAD]:
await self._sync_new_user_with_gameplay(client, server_room)
await self.event_logger.player_joined(room_id, user.user_id) await self.event_logger.player_joined(room_id, user.user_id)
@@ -327,6 +556,42 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
redis = get_redis() redis = get_redis()
await redis.publish("chat:room:joined", f"{room.channel_id}:{user.user_id}") await redis.publish("chat:room:joined", f"{room.channel_id}:{user.user_id}")
# 通知观战Hub有新用户加入
if self.spectator_sync_manager:
await self.spectator_sync_manager.notify_spectator_hubs(
room_id, "user_joined", {'user_id': user.user_id}
)
return room
async def _sync_new_user_with_gameplay(self, client: Client, room: ServerMultiplayerRoom):
"""同步新用户与正在进行的游戏状态"""
try:
room_id = room.room.room_id
# 获取游戏状态快照
snapshot = gameplay_buffer.get_gameplay_snapshot(room_id)
if not snapshot:
# 创建新的快照
room_data = {
'state': room.room.state,
'current_item': room.queue.current_item,
'users': [{'user_id': u.user_id, 'state': u.state} for u in room.room.users]
}
snapshot = await gameplay_buffer.create_gameplay_snapshot(room_id, room_data)
# 发送游戏状态到新用户
await self.broadcast_call(client.connection_id, "GameplayStateSync", snapshot)
# 发送实时排行榜
leaderboard = gameplay_buffer.get_leaderboard(room_id)
if leaderboard:
await self.broadcast_call(client.connection_id, "LeaderboardUpdate", leaderboard)
logger.info(f"[MultiplayerHub] Synced gameplay state for user {client.user_id} in room {room_id}")
except Exception as e:
logger.error(f"Error syncing new user with gameplay: {e}")
return room return room
@@ -704,14 +969,32 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
if user.state == state: if user.state == state:
return return
# 记录状态变化用于观战同步
old_state = user.state
# Special handling for state changes during gameplay # Special handling for state changes during gameplay
match state: match state:
case MultiplayerUserState.IDLE: case MultiplayerUserState.IDLE:
if user.state.is_playing: if user.state.is_playing:
# 玩家退出游戏时,清理分数缓冲区
room_id = room.room_id
if room_id in gameplay_buffer.score_buffers:
gameplay_buffer.score_buffers[room_id].pop(user.user_id, None)
await gameplay_buffer._update_leaderboard(room_id)
await self._broadcast_leaderboard_update(server_room)
return return
case MultiplayerUserState.LOADED | MultiplayerUserState.READY_FOR_GAMEPLAY: case MultiplayerUserState.LOADED | MultiplayerUserState.READY_FOR_GAMEPLAY:
if not user.state.is_playing: if not user.state.is_playing:
return return
case MultiplayerUserState.PLAYING:
# 开始游戏时初始化分数缓冲区
room_id = room.room_id
await gameplay_buffer.add_score_frame(room_id, user.user_id, {
'score': 0,
'combo': 0,
'accuracy': 100.0,
'completed': False
})
logger.info( logger.info(
f"[MultiplayerHub] User {user.user_id} changing state from {user.state} to {state}" f"[MultiplayerHub] User {user.user_id} changing state from {user.state} to {state}"
@@ -729,8 +1012,63 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
if state == MultiplayerUserState.SPECTATING: if state == MultiplayerUserState.SPECTATING:
await self.handle_spectator_state_change(client, server_room, user) await self.handle_spectator_state_change(client, server_room, user)
# 通知观战Hub状态变化
if self.spectator_sync_manager:
await self.spectator_sync_manager.notify_user_state_change(
room.room_id, user.user_id, old_state.name, state.name
)
await self.update_room_state(server_room) await self.update_room_state(server_room)
async def _broadcast_leaderboard_update(self, room: ServerMultiplayerRoom):
"""广播实时排行榜更新"""
try:
room_id = room.room.room_id
leaderboard = gameplay_buffer.get_leaderboard(room_id)
if leaderboard:
await self.broadcast_group_call(
self.group_id(room_id),
"LeaderboardUpdate",
leaderboard
)
logger.debug(f"[MultiplayerHub] Broadcasted leaderboard update to room {room_id}")
except Exception as e:
logger.error(f"Error broadcasting leaderboard update: {e}")
async def _start_leaderboard_broadcast_task(self, room_id: int):
"""启动实时排行榜广播任务"""
if room_id in self.leaderboard_tasks:
return
async def leaderboard_broadcast_loop():
try:
while room_id in self.rooms and room_id in self.leaderboard_tasks:
if room_id in self.rooms:
server_room = self.rooms[room_id]
if server_room.room.state == MultiplayerRoomState.PLAYING:
await self._broadcast_leaderboard_update(server_room)
await asyncio.sleep(1.0) # 每秒更新一次排行榜
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error in leaderboard broadcast loop for room {room_id}: {e}")
task = asyncio.create_task(leaderboard_broadcast_loop())
self.leaderboard_tasks[room_id] = task
async def _stop_leaderboard_broadcast_task(self, room_id: int):
"""停止实时排行榜广播任务"""
if room_id in self.leaderboard_tasks:
task = self.leaderboard_tasks.pop(room_id)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def change_user_state( async def change_user_state(
self, self,
room: ServerMultiplayerRoom, room: ServerMultiplayerRoom,
@@ -1017,16 +1355,44 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
async def change_room_state( async def change_room_state(
self, room: ServerMultiplayerRoom, state: MultiplayerRoomState self, room: ServerMultiplayerRoom, state: MultiplayerRoomState
): ):
old_state = room.room.state
room_id = room.room.room_id
logger.debug( logger.debug(
f"[MultiplayerHub] Room {room.room.room_id} state " f"[MultiplayerHub] Room {room_id} state "
f"changed from {room.room.state} to {state}" f"changed from {old_state} to {state}"
) )
room.room.state = state room.room.state = state
await self.broadcast_group_call( await self.broadcast_group_call(
self.group_id(room.room.room_id), self.group_id(room_id),
"RoomStateChanged", "RoomStateChanged",
state, state,
) )
# 处理状态变化的特殊逻辑
if old_state == MultiplayerRoomState.PLAYING and state == MultiplayerRoomState.OPEN:
# 游戏结束,停止实时排行榜广播
await self._stop_leaderboard_broadcast_task(room_id)
# 发送最终排行榜
leaderboard = gameplay_buffer.get_leaderboard(room_id)
if leaderboard:
await self.broadcast_group_call(
self.group_id(room_id),
"FinalLeaderboard",
leaderboard
)
# 通知观战Hub游戏结束
if self.spectator_sync_manager:
await self.spectator_sync_manager.notify_gameplay_ended(room_id, {
'leaderboard': leaderboard
})
elif state == MultiplayerRoomState.PLAYING:
# 游戏开始,启动实时排行榜
await self._start_leaderboard_broadcast_task(room_id)
async def StartMatch(self, client: Client): async def StartMatch(self, client: Client):
server_room = self._ensure_in_room(client) server_room = self._ensure_in_room(client)
@@ -1099,6 +1465,8 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
await room.stop_all_countdowns(ForceGameplayStartCountdown) await room.stop_all_countdowns(ForceGameplayStartCountdown)
playing = False playing = False
played_user = 0 played_user = 0
room_id = room.room.room_id
for user in room.room.users: for user in room.room.users:
client = self.get_client_by_id(str(user.user_id)) client = self.get_client_by_id(str(user.user_id))
if client is None: if client is None:
@@ -1112,6 +1480,15 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
played_user += 1 played_user += 1
await self.change_user_state(room, user, MultiplayerUserState.PLAYING) await self.change_user_state(room, user, MultiplayerUserState.PLAYING)
await self.call_noblock(client, "GameplayStarted") await self.call_noblock(client, "GameplayStarted")
# 初始化玩家分数缓冲区
await gameplay_buffer.add_score_frame(room_id, user.user_id, {
'score': 0,
'combo': 0,
'accuracy': 100.0,
'completed': False
})
elif user.state == MultiplayerUserState.WAITING_FOR_LOAD: elif user.state == MultiplayerUserState.WAITING_FOR_LOAD:
await self.change_user_state(room, user, MultiplayerUserState.IDLE) await self.change_user_state(room, user, MultiplayerUserState.IDLE)
await self.broadcast_group_call( await self.broadcast_group_call(
@@ -1119,11 +1496,28 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
"GameplayAborted", "GameplayAborted",
GameplayAbortReason.LOAD_TOOK_TOO_LONG, GameplayAbortReason.LOAD_TOOK_TOO_LONG,
) )
await self.change_room_state( await self.change_room_state(
room, room,
(MultiplayerRoomState.PLAYING if playing else MultiplayerRoomState.OPEN), (MultiplayerRoomState.PLAYING if playing else MultiplayerRoomState.OPEN),
) )
if playing: if playing:
# 创建游戏状态快照
room_data = {
'state': room.room.state,
'current_item': room.queue.current_item,
'users': [{'user_id': u.user_id, 'state': u.state} for u in room.room.users]
}
await gameplay_buffer.create_gameplay_snapshot(room_id, room_data)
# 启动实时排行榜广播
await self._start_leaderboard_broadcast_task(room_id)
# 通知观战Hub游戏开始
if self.spectator_sync_manager:
await self.spectator_sync_manager.notify_gameplay_started(room_id, room_data)
redis = get_redis() redis = get_redis()
await redis.set( await redis.set(
f"multiplayer:{room.room.room_id}:gameplay:players", f"multiplayer:{room.room.room_id}:gameplay:players",
@@ -1224,8 +1618,86 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
room.room.room_id, room.room.room_id,
room.room.host.user_id, room.room.host.user_id,
) )
del self.rooms[room.room.room_id]
logger.info(f"[MultiplayerHub] Room {room.room.room_id} ended") room_id = room.room.room_id
# 清理实时数据
await self._stop_leaderboard_broadcast_task(room_id)
await gameplay_buffer.cleanup_room(room_id)
# 清理观战同步任务
if room_id in self.spectator_sync_tasks:
task = self.spectator_sync_tasks.pop(room_id)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
del self.rooms[room_id]
logger.info(f"[MultiplayerHub] Room {room_id} ended")
async def UpdateScore(self, client: Client, score_data: Dict):
"""接收并处理实时分数更新"""
try:
server_room = self._ensure_in_room(client)
room = server_room.room
user = next((u for u in room.users if u.user_id == client.user_id), None)
if user is None:
raise InvokeException("User not found in room")
if room.state != MultiplayerRoomState.PLAYING:
return
if user.state != MultiplayerUserState.PLAYING:
return
room_id = room.room_id
# 添加分数帧到缓冲区
await gameplay_buffer.add_score_frame(room_id, client.user_id, {
'score': score_data.get('score', 0),
'combo': score_data.get('combo', 0),
'accuracy': score_data.get('accuracy', 0.0),
'completed': score_data.get('completed', False),
'hp': score_data.get('hp', 1.0),
'position': score_data.get('position', 0)
})
except Exception as e:
logger.error(f"Error updating score for user {client.user_id}: {e}")
async def GetLeaderboard(self, client: Client) -> List[Dict]:
"""获取当前房间的实时排行榜"""
try:
server_room = self._ensure_in_room(client)
room_id = server_room.room.room_id
return gameplay_buffer.get_leaderboard(room_id)
except Exception as e:
logger.error(f"Error getting leaderboard for user {client.user_id}: {e}")
return []
async def RequestSpectatorSync(self, client: Client):
"""观战者请求同步当前游戏状态"""
try:
server_room = self._ensure_in_room(client)
room_id = server_room.room.room_id
# 发送游戏状态快照
snapshot = gameplay_buffer.get_gameplay_snapshot(room_id)
if snapshot:
await self.broadcast_call(client.connection_id, "GameplayStateSync", snapshot)
# 发送当前排行榜
leaderboard = gameplay_buffer.get_leaderboard(room_id)
if leaderboard:
await self.broadcast_call(client.connection_id, "LeaderboardUpdate", leaderboard)
logger.info(f"[MultiplayerHub] Sent spectator sync to user {client.user_id}")
except Exception as e:
logger.error(f"Error handling spectator sync request: {e}")
async def LeaveRoom(self, client: Client): async def LeaveRoom(self, client: Client):
store = self.get_or_create_state(client) store = self.get_or_create_state(client)

View File

@@ -0,0 +1,251 @@
"""
SpectatorHub增强补丁
与MultiplayerHub集成以支持多人游戏观战
"""
from __future__ import annotations
import asyncio
import json
from datetime import datetime, UTC
from typing import Dict, Optional
from collections import defaultdict
from app.dependencies.database import get_redis
from app.log import logger
class SpectatorMultiplayerIntegration:
"""SpectatorHub的多人游戏集成扩展"""
def __init__(self, spectator_hub_instance):
self.hub = spectator_hub_instance
self.redis = None
self.multiplayer_subscribers = defaultdict(set) # room_id -> set of connection_ids
self.leaderboard_cache = {} # room_id -> leaderboard data
async def initialize(self):
"""初始化多人游戏集成"""
self.redis = get_redis()
# 启动Redis订阅任务
asyncio.create_task(self._subscribe_to_multiplayer_events())
asyncio.create_task(self._subscribe_to_leaderboard_updates())
asyncio.create_task(self._subscribe_to_spectator_sync())
async def _subscribe_to_multiplayer_events(self):
"""订阅多人游戏事件"""
try:
pubsub = self.redis.pubsub()
await pubsub.psubscribe("multiplayer_spectator:*")
async for message in pubsub.listen():
if message['type'] == 'pmessage':
try:
data = json.loads(message['data'])
await self._handle_multiplayer_event(message['channel'], data)
except Exception as e:
logger.error(f"Error processing multiplayer event: {e}")
except Exception as e:
logger.error(f"Error in multiplayer events subscription: {e}")
async def _subscribe_to_leaderboard_updates(self):
"""订阅排行榜更新"""
try:
pubsub = self.redis.pubsub()
await pubsub.psubscribe("leaderboard_update:*")
async for message in pubsub.listen():
if message['type'] == 'pmessage':
try:
data = json.loads(message['data'])
user_id = message['channel'].split(':')[-1]
await self._send_leaderboard_to_user(int(user_id), data['leaderboard'])
except Exception as e:
logger.error(f"Error processing leaderboard update: {e}")
except Exception as e:
logger.error(f"Error in leaderboard updates subscription: {e}")
async def _subscribe_to_spectator_sync(self):
"""订阅观战同步事件"""
try:
pubsub = self.redis.pubsub()
await pubsub.psubscribe("spectator_sync:*")
async for message in pubsub.listen():
if message['type'] == 'pmessage':
try:
data = json.loads(message['data'])
room_id = message['channel'].split(':')[-1]
await self._handle_spectator_sync(int(room_id), data)
except Exception as e:
logger.error(f"Error processing spectator sync: {e}")
except Exception as e:
logger.error(f"Error in spectator sync subscription: {e}")
async def _handle_multiplayer_event(self, channel: str, data: Dict):
"""处理多人游戏事件"""
room_id = data.get('room_id')
event_type = data.get('event_type')
event_data = data.get('data', {})
if not room_id or not event_type:
return
if event_type == "gameplay_started":
await self._handle_gameplay_started(room_id, event_data)
elif event_type == "gameplay_ended":
await self._handle_gameplay_ended(room_id, event_data)
elif event_type == "user_state_changed":
await self._handle_user_state_changed(room_id, event_data)
async def _handle_gameplay_started(self, room_id: int, game_data: Dict):
"""处理游戏开始事件"""
logger.info(f"[SpectatorHub] Multiplayer game started in room {room_id}")
# 通知所有观战该房间的用户
if room_id in self.multiplayer_subscribers:
for connection_id in self.multiplayer_subscribers[room_id]:
await self._send_to_connection(connection_id, "MultiplayerGameStarted", game_data)
async def _handle_gameplay_ended(self, room_id: int, results_data: Dict):
"""处理游戏结束事件"""
logger.info(f"[SpectatorHub] Multiplayer game ended in room {room_id}")
# 发送最终结果给观战者
if room_id in self.multiplayer_subscribers:
for connection_id in self.multiplayer_subscribers[room_id]:
await self._send_to_connection(connection_id, "MultiplayerGameEnded", results_data)
async def _handle_user_state_changed(self, room_id: int, state_data: Dict):
"""处理用户状态变化"""
user_id = state_data.get('user_id')
new_state = state_data.get('new_state')
if room_id in self.multiplayer_subscribers:
for connection_id in self.multiplayer_subscribers[room_id]:
await self._send_to_connection(connection_id, "MultiplayerUserStateChanged", {
'user_id': user_id,
'state': new_state
})
async def _handle_spectator_sync(self, room_id: int, sync_data: Dict):
"""处理观战同步请求"""
target_user = sync_data.get('target_user')
snapshot = sync_data.get('snapshot')
if target_user and snapshot:
# 找到目标用户的连接并发送快照
client = self.hub.get_client_by_id(str(target_user))
if client:
await self._send_to_connection(client.connection_id, "MultiplayerSnapshot", snapshot)
async def _send_leaderboard_to_user(self, user_id: int, leaderboard: list):
"""发送排行榜给指定用户"""
client = self.hub.get_client_by_id(str(user_id))
if client:
await self._send_to_connection(client.connection_id, "MultiplayerLeaderboard", leaderboard)
async def _send_to_connection(self, connection_id: str, method: str, data):
"""发送数据到指定连接"""
try:
await self.hub.broadcast_call(connection_id, method, data)
except Exception as e:
logger.error(f"Error sending {method} to connection {connection_id}: {e}")
async def subscribe_to_multiplayer_room(self, connection_id: str, room_id: int):
"""订阅多人游戏房间的观战"""
self.multiplayer_subscribers[room_id].add(connection_id)
# 通知MultiplayerHub有新的观战者
await self.redis.publish(f"multiplayer_spectator:room:{room_id}", json.dumps({
'event_type': 'spectator_joined',
'data': {'user_id': self._get_user_id_from_connection(connection_id)},
'room_id': room_id,
'timestamp': datetime.now(UTC).isoformat()
}))
logger.info(f"[SpectatorHub] Connection {connection_id} subscribed to multiplayer room {room_id}")
async def unsubscribe_from_multiplayer_room(self, connection_id: str, room_id: int):
"""取消订阅多人游戏房间"""
if room_id in self.multiplayer_subscribers:
self.multiplayer_subscribers[room_id].discard(connection_id)
if not self.multiplayer_subscribers[room_id]:
del self.multiplayer_subscribers[room_id]
logger.info(f"[SpectatorHub] Connection {connection_id} unsubscribed from multiplayer room {room_id}")
async def request_multiplayer_leaderboard(self, connection_id: str, room_id: int):
"""请求多人游戏排行榜"""
user_id = self._get_user_id_from_connection(connection_id)
if user_id:
await self.redis.publish(f"multiplayer_spectator:room:{room_id}", json.dumps({
'event_type': 'request_leaderboard',
'data': {'user_id': user_id},
'room_id': room_id,
'timestamp': datetime.now(UTC).isoformat()
}))
def _get_user_id_from_connection(self, connection_id: str) -> Optional[int]:
"""从连接ID获取用户ID"""
# 这需要根据实际的SpectatorHub实现来调整
for client in self.hub.clients.values():
if client.connection_id == connection_id:
return client.user_id
return None
# 在SpectatorHub类中添加的方法
async def init_multiplayer_integration(self):
"""初始化多人游戏集成"""
if not hasattr(self, 'multiplayer_integration'):
self.multiplayer_integration = SpectatorMultiplayerIntegration(self)
await self.multiplayer_integration.initialize()
async def WatchMultiplayerRoom(self, client, room_id: int):
"""开始观战多人游戏房间"""
try:
if not hasattr(self, 'multiplayer_integration'):
await self.init_multiplayer_integration()
await self.multiplayer_integration.subscribe_to_multiplayer_room(
client.connection_id, room_id
)
# 请求当前状态同步
await self.multiplayer_integration.request_multiplayer_leaderboard(
client.connection_id, room_id
)
return {"success": True, "message": f"Now watching multiplayer room {room_id}"}
except Exception as e:
logger.error(f"Error starting multiplayer room watch: {e}")
raise InvokeException(f"Failed to watch multiplayer room: {e}")
async def StopWatchingMultiplayerRoom(self, client, room_id: int):
"""停止观战多人游戏房间"""
try:
if hasattr(self, 'multiplayer_integration'):
await self.multiplayer_integration.unsubscribe_from_multiplayer_room(
client.connection_id, room_id
)
return {"success": True, "message": f"Stopped watching multiplayer room {room_id}"}
except Exception as e:
logger.error(f"Error stopping multiplayer room watch: {e}")
raise InvokeException(f"Failed to stop watching multiplayer room: {e}")
async def RequestMultiplayerLeaderboard(self, client, room_id: int):
"""请求多人游戏实时排行榜"""
try:
if not hasattr(self, 'multiplayer_integration'):
await self.init_multiplayer_integration()
await self.multiplayer_integration.request_multiplayer_leaderboard(
client.connection_id, room_id
)
return {"success": True, "message": "Leaderboard request sent"}
except Exception as e:
logger.error(f"Error requesting multiplayer leaderboard: {e}")
raise InvokeException(f"Failed to request leaderboard: {e}")

View File

@@ -124,3 +124,60 @@ def truncate(text: str, limit: int = 100, ellipsis: str = "...") -> str:
if len(text) > limit: if len(text) > limit:
return text[:limit] + ellipsis return text[:limit] + ellipsis
return text return text
def parse_user_agent(user_agent: str | None, max_length: int = 255) -> str | None:
"""
解析用户代理字符串,提取关键信息:设备、系统、浏览器
参数:
user_agent: 用户代理字符串
max_length: 最大长度限制
返回:
简化后的用户代理字符串
"""
if user_agent is None:
return None
# 检查是否是 osu! 客户端
if "osu!" in user_agent.lower():
return "osu!"
# 提取关键信息
parsed_info = []
# 提取设备信息
device_matches = [
# 常见移动设备型号
r"(iPhone|iPad|iPod|Android|ALI-AN00|SM-\w+|MI \w+|Redmi|HUAWEI|HONOR|POCO)",
# 其他设备关键词
r"(Windows NT|Macintosh|Linux|Ubuntu)"
]
import re
for pattern in device_matches:
matches = re.findall(pattern, user_agent)
if matches:
parsed_info.extend(matches)
# 提取浏览器信息
browser_matches = [
r"(Chrome|Firefox|Safari|Edge|MSIE|MQQBrowser|MiuiBrowser|OPR|Opera)",
r"(WebKit|Gecko|Trident)"
]
for pattern in browser_matches:
matches = re.findall(pattern, user_agent)
if matches:
# 只取第一个匹配的浏览器
parsed_info.append(matches[0])
break
# 组合信息
if parsed_info:
result = " / ".join(set(parsed_info))
return truncate(result, max_length - 3, "...")
# 如果无法解析,则截断原始字符串
return truncate(user_agent, max_length - 3, "...")

View File

@@ -37,7 +37,6 @@ from app.service.recalculate import recalculate
from app.service.redis_message_system import redis_message_system from app.service.redis_message_system import redis_message_system
from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler
from app.service.online_status_maintenance import schedule_online_status_maintenance from app.service.online_status_maintenance import schedule_online_status_maintenance
from app.service.realtime_online_cleanup import start_realtime_cleanup, stop_realtime_cleanup
# 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic # 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic
newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini") newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini")
@@ -87,7 +86,6 @@ async def lifespan(app: FastAPI):
await start_database_cleanup_scheduler() # 启动数据库清理调度器 await start_database_cleanup_scheduler() # 启动数据库清理调度器
redis_message_system.start() # 启动 Redis 消息系统 redis_message_system.start() # 启动 Redis 消息系统
start_stats_scheduler() # 启动统计调度器 start_stats_scheduler() # 启动统计调度器
start_realtime_cleanup() # 启动实时在线状态清理服务
schedule_online_status_maintenance() # 启动在线状态维护任务 schedule_online_status_maintenance() # 启动在线状态维护任务
load_achievements() load_achievements()
# on shutdown # on shutdown
@@ -95,7 +93,6 @@ async def lifespan(app: FastAPI):
stop_scheduler() stop_scheduler()
redis_message_system.stop() # 停止 Redis 消息系统 redis_message_system.stop() # 停止 Redis 消息系统
stop_stats_scheduler() # 停止统计调度器 stop_stats_scheduler() # 停止统计调度器
stop_realtime_cleanup() # 停止实时在线状态清理服务
await stop_cache_scheduler() # 停止缓存调度器 await stop_cache_scheduler() # 停止缓存调度器
await stop_database_cleanup_scheduler() # 停止数据库清理调度器 await stop_database_cleanup_scheduler() # 停止数据库清理调度器
await download_service.stop_health_check() # 停止下载服务健康检查 await download_service.stop_health_check() # 停止下载服务健康检查