修复在线问题
This commit is contained in:
74
app/service/online_status_maintenance.py
Normal file
74
app/service/online_status_maintenance.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""
|
||||
在线状态维护服务
|
||||
|
||||
此模块提供在游玩状态下维护用户在线状态的功能,
|
||||
解决游玩时显示离线的问题。
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from app.dependencies.database import get_redis
|
||||
from app.log import logger
|
||||
from app.router.v2.stats import REDIS_PLAYING_USERS_KEY, _redis_exec, get_redis_message
|
||||
|
||||
|
||||
async def maintain_playing_users_online_status():
|
||||
"""
|
||||
维护正在游玩用户的在线状态
|
||||
|
||||
定期刷新正在游玩用户的metadata在线标记,
|
||||
确保他们在游玩过程中显示为在线状态。
|
||||
"""
|
||||
redis_sync = get_redis_message()
|
||||
redis_async = get_redis()
|
||||
|
||||
try:
|
||||
# 获取所有正在游玩的用户
|
||||
playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY)
|
||||
|
||||
if not playing_users:
|
||||
return
|
||||
|
||||
logger.debug(f"Maintaining online status for {len(playing_users)} playing users")
|
||||
|
||||
# 为每个游玩用户刷新metadata在线标记
|
||||
for user_id in playing_users:
|
||||
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
|
||||
metadata_key = f"metadata:online:{user_id_str}"
|
||||
|
||||
# 设置或刷新metadata在线标记,过期时间为1小时
|
||||
await redis_async.set(metadata_key, "playing", ex=3600)
|
||||
|
||||
logger.debug(f"Updated metadata online status for {len(playing_users)} playing users")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error maintaining playing users online status: {e}")
|
||||
|
||||
|
||||
async def start_online_status_maintenance_task():
|
||||
"""
|
||||
启动在线状态维护任务
|
||||
|
||||
每5分钟运行一次维护任务,确保游玩用户保持在线状态
|
||||
"""
|
||||
logger.info("Starting online status maintenance task")
|
||||
|
||||
while True:
|
||||
try:
|
||||
await maintain_playing_users_online_status()
|
||||
# 每5分钟运行一次
|
||||
await asyncio.sleep(300)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in online status maintenance task: {e}")
|
||||
# 出错后等待30秒再重试
|
||||
await asyncio.sleep(30)
|
||||
|
||||
|
||||
def schedule_online_status_maintenance():
|
||||
"""
|
||||
调度在线状态维护任务
|
||||
"""
|
||||
task = asyncio.create_task(start_online_status_maintenance_task())
|
||||
return task
|
||||
136
app/service/online_status_manager.py
Normal file
136
app/service/online_status_manager.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""
|
||||
在线状态管理服务
|
||||
|
||||
此模块负责统一管理用户的在线状态,确保用户在连接WebSocket后立即显示为在线。
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
|
||||
from app.dependencies.database import get_redis
|
||||
from app.log import logger
|
||||
from app.router.v2.stats import add_online_user
|
||||
|
||||
|
||||
class OnlineStatusManager:
|
||||
"""在线状态管理器"""
|
||||
|
||||
@staticmethod
|
||||
async def set_user_online(user_id: int, hub_type: str = "general") -> None:
|
||||
"""
|
||||
设置用户为在线状态
|
||||
|
||||
Args:
|
||||
user_id: 用户ID
|
||||
hub_type: Hub类型 (metadata, spectator, multiplayer等)
|
||||
"""
|
||||
try:
|
||||
redis = get_redis()
|
||||
|
||||
# 1. 添加到在线用户集合
|
||||
await add_online_user(user_id)
|
||||
|
||||
# 2. 设置metadata在线标记,这是is_online检查的关键
|
||||
metadata_key = f"metadata:online:{user_id}"
|
||||
await redis.set(metadata_key, hub_type, ex=7200) # 2小时过期
|
||||
|
||||
# 3. 设置最后活跃时间戳
|
||||
last_seen_key = f"user:last_seen:{user_id}"
|
||||
await redis.set(last_seen_key, int(datetime.utcnow().timestamp()), ex=7200)
|
||||
|
||||
logger.debug(f"[OnlineStatusManager] User {user_id} set online via {hub_type}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[OnlineStatusManager] Error setting user {user_id} online: {e}")
|
||||
|
||||
@staticmethod
|
||||
async def refresh_user_online_status(user_id: int, hub_type: str = "active") -> None:
|
||||
"""
|
||||
刷新用户的在线状态
|
||||
|
||||
Args:
|
||||
user_id: 用户ID
|
||||
hub_type: 当前活动类型
|
||||
"""
|
||||
try:
|
||||
redis = get_redis()
|
||||
|
||||
# 刷新metadata在线标记
|
||||
metadata_key = f"metadata:online:{user_id}"
|
||||
await redis.set(metadata_key, hub_type, ex=7200)
|
||||
|
||||
# 刷新最后活跃时间
|
||||
last_seen_key = f"user:last_seen:{user_id}"
|
||||
await redis.set(last_seen_key, int(datetime.utcnow().timestamp()), ex=7200)
|
||||
|
||||
logger.debug(f"[OnlineStatusManager] Refreshed online status for user {user_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[OnlineStatusManager] Error refreshing user {user_id} status: {e}")
|
||||
|
||||
@staticmethod
|
||||
async def set_user_offline(user_id: int) -> None:
|
||||
"""
|
||||
设置用户为离线状态
|
||||
|
||||
Args:
|
||||
user_id: 用户ID
|
||||
"""
|
||||
try:
|
||||
redis = get_redis()
|
||||
|
||||
# 删除metadata在线标记
|
||||
metadata_key = f"metadata:online:{user_id}"
|
||||
await redis.delete(metadata_key)
|
||||
|
||||
# 从在线用户集合中移除
|
||||
from app.router.v2.stats import remove_online_user
|
||||
await remove_online_user(user_id)
|
||||
|
||||
logger.debug(f"[OnlineStatusManager] User {user_id} set offline")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[OnlineStatusManager] Error setting user {user_id} offline: {e}")
|
||||
|
||||
@staticmethod
|
||||
async def is_user_online(user_id: int) -> bool:
|
||||
"""
|
||||
检查用户是否在线
|
||||
|
||||
Args:
|
||||
user_id: 用户ID
|
||||
|
||||
Returns:
|
||||
bool: 用户是否在线
|
||||
"""
|
||||
try:
|
||||
redis = get_redis()
|
||||
metadata_key = f"metadata:online:{user_id}"
|
||||
is_online = await redis.exists(metadata_key)
|
||||
return bool(is_online)
|
||||
except Exception as e:
|
||||
logger.error(f"[OnlineStatusManager] Error checking user {user_id} online status: {e}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
async def get_online_users_count() -> int:
|
||||
"""
|
||||
获取在线用户数量
|
||||
|
||||
Returns:
|
||||
int: 在线用户数量
|
||||
"""
|
||||
try:
|
||||
from app.router.v2.stats import _get_online_users_count
|
||||
from app.dependencies.database import get_redis
|
||||
|
||||
redis = get_redis()
|
||||
return await _get_online_users_count(redis)
|
||||
except Exception as e:
|
||||
logger.error(f"[OnlineStatusManager] Error getting online users count: {e}")
|
||||
return 0
|
||||
|
||||
|
||||
# 单例实例
|
||||
online_status_manager = OnlineStatusManager()
|
||||
@@ -48,17 +48,19 @@ async def cleanup_stale_online_users() -> tuple[int, int]:
|
||||
online_cleaned = len(stale_online_users)
|
||||
logger.info(f"Cleaned {online_cleaned} stale online users")
|
||||
|
||||
# 对于游玩用户,我们也检查对应的spectator状态
|
||||
# 对于游玩用户,我们使用更保守的清理策略
|
||||
# 只有当用户明确不在任何hub连接中时才移除
|
||||
stale_playing_users = []
|
||||
for user_id in playing_users:
|
||||
user_id_str = (
|
||||
user_id.decode() if isinstance(user_id, bytes) else str(user_id)
|
||||
)
|
||||
|
||||
# 如果用户不在在线用户列表中,说明已经离线,也应该从游玩列表中移除
|
||||
if user_id_str in stale_online_users or user_id_str not in [
|
||||
u.decode() if isinstance(u, bytes) else str(u) for u in online_users
|
||||
]:
|
||||
metadata_key = f"metadata:online:{user_id_str}"
|
||||
|
||||
# 只有当metadata在线标记完全不存在且用户也不在在线列表中时,
|
||||
# 才认为用户真正离线
|
||||
if (not await redis_async.exists(metadata_key) and
|
||||
user_id_str not in [u.decode() if isinstance(u, bytes) else str(u) for u in online_users]):
|
||||
stale_playing_users.append(user_id_str)
|
||||
|
||||
# 清理过期的游玩用户
|
||||
|
||||
@@ -14,6 +14,7 @@ from app.database.playlists import Playlist
|
||||
from app.database.room import Room
|
||||
from app.database.score import Score
|
||||
from app.dependencies.database import get_redis, with_db
|
||||
from app.log import logger
|
||||
from app.models.metadata_hub import (
|
||||
TOTAL_SCORE_DISTRIBUTION_BINS,
|
||||
DailyChallengeInfo,
|
||||
@@ -93,16 +94,13 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
async def _clean_state(self, state: MetadataClientState) -> None:
|
||||
user_id = int(state.connection_id)
|
||||
|
||||
# Remove from online user tracking
|
||||
from app.router.v2.stats import remove_online_user
|
||||
|
||||
asyncio.create_task(remove_online_user(user_id))
|
||||
# Use centralized offline status management
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.set_user_offline(user_id)
|
||||
|
||||
if state.pushable:
|
||||
await asyncio.gather(*self.broadcast_tasks(user_id, None))
|
||||
redis = get_redis()
|
||||
if await redis.exists(f"metadata:online:{state.connection_id}"):
|
||||
await redis.delete(f"metadata:online:{state.connection_id}")
|
||||
|
||||
async with with_db() as session:
|
||||
async with session.begin():
|
||||
user = (
|
||||
@@ -122,12 +120,16 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
|
||||
async def on_client_connect(self, client: Client) -> None:
|
||||
user_id = int(client.connection_id)
|
||||
self.get_or_create_state(client)
|
||||
store = self.get_or_create_state(client)
|
||||
|
||||
# Track online user
|
||||
from app.router.v2.stats import add_online_user
|
||||
# Use centralized online status management
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.set_user_online(user_id, "metadata")
|
||||
|
||||
asyncio.create_task(add_online_user(user_id))
|
||||
# CRITICAL FIX: Set online status IMMEDIATELY upon connection
|
||||
# This matches the C# official implementation behavior
|
||||
store.status = OnlineStatus.ONLINE
|
||||
logger.info(f"[MetadataHub] Set user {user_id} status to ONLINE upon connection")
|
||||
|
||||
async with with_db() as session:
|
||||
async with session.begin():
|
||||
@@ -175,8 +177,23 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
room_id=daily_challenge_room.id,
|
||||
),
|
||||
)
|
||||
redis = get_redis()
|
||||
await redis.set(f"metadata:online:{user_id}", "")
|
||||
|
||||
# CRITICAL FIX: Immediately broadcast the user's online status to all watchers
|
||||
# This ensures the user appears as "currently online" right after connection
|
||||
# Similar to the C# implementation's immediate broadcast logic
|
||||
online_presence_tasks = self.broadcast_tasks(user_id, store)
|
||||
if online_presence_tasks:
|
||||
await asyncio.gather(*online_presence_tasks)
|
||||
logger.info(f"[MetadataHub] Broadcasted online status for user {user_id} to watchers")
|
||||
|
||||
# Also send the user's own presence update to confirm online status
|
||||
await self.call_noblock(
|
||||
client,
|
||||
"UserPresenceUpdated",
|
||||
user_id,
|
||||
store.for_push,
|
||||
)
|
||||
logger.info(f"[MetadataHub] User {user_id} is now ONLINE and visible to other clients")
|
||||
|
||||
async def UpdateStatus(self, client: Client, status: int) -> None:
|
||||
status_ = OnlineStatus(status)
|
||||
@@ -214,19 +231,22 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def BeginWatchingUserPresence(self, client: Client) -> None:
|
||||
# Critical fix: Send all currently online users to the new watcher
|
||||
# Must use for_push to get the correct UserPresence format
|
||||
await asyncio.gather(
|
||||
*[
|
||||
self.call_noblock(
|
||||
client,
|
||||
"UserPresenceUpdated",
|
||||
user_id,
|
||||
store,
|
||||
store.for_push, # Fixed: use for_push instead of store
|
||||
)
|
||||
for user_id, store in self.state.items()
|
||||
if store.pushable
|
||||
]
|
||||
)
|
||||
self.add_to_group(client, self.online_presence_watchers_group())
|
||||
logger.info(f"[MetadataHub] Client {client.connection_id} now watching user presence, sent {len([s for s in self.state.values() if s.pushable])} online users")
|
||||
|
||||
async def EndWatchingUserPresence(self, client: Client) -> None:
|
||||
self.remove_from_group(client, self.online_presence_watchers_group())
|
||||
|
||||
@@ -164,10 +164,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
|
||||
async def _clean_state(self, state: MultiplayerClientState):
|
||||
user_id = int(state.connection_id)
|
||||
|
||||
# Remove from online user tracking
|
||||
from app.router.v2.stats import remove_online_user
|
||||
|
||||
asyncio.create_task(remove_online_user(user_id))
|
||||
# Use centralized offline status management
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.set_user_offline(user_id)
|
||||
|
||||
if state.room_id != 0 and state.room_id in self.rooms:
|
||||
server_room = self.rooms[state.room_id]
|
||||
@@ -182,10 +181,9 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
|
||||
"""Track online users when connecting to multiplayer hub"""
|
||||
logger.info(f"[MultiplayerHub] Client {client.user_id} connected")
|
||||
|
||||
# Track online user
|
||||
from app.router.v2.stats import add_online_user
|
||||
|
||||
asyncio.create_task(add_online_user(client.user_id))
|
||||
# Use centralized online status management
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.set_user_online(client.user_id, "multiplayer")
|
||||
|
||||
def _ensure_in_room(self, client: Client) -> ServerMultiplayerRoom:
|
||||
store = self.get_or_create_state(client)
|
||||
|
||||
@@ -171,10 +171,9 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
"""
|
||||
user_id = int(state.connection_id)
|
||||
|
||||
# Remove from online and playing tracking
|
||||
from app.router.v2.stats import remove_online_user
|
||||
|
||||
asyncio.create_task(remove_online_user(user_id))
|
||||
# Use centralized offline status management
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.set_user_offline(user_id)
|
||||
|
||||
if state.state:
|
||||
await self._end_session(user_id, state.state, state)
|
||||
@@ -197,10 +196,9 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
"""
|
||||
logger.info(f"[SpectatorHub] Client {client.user_id} connected")
|
||||
|
||||
# Track online user
|
||||
from app.router.v2.stats import add_online_user
|
||||
|
||||
asyncio.create_task(add_online_user(client.user_id))
|
||||
# Use centralized online status management
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.set_user_online(client.user_id, "spectator")
|
||||
|
||||
# Send all current player states to the new client
|
||||
# This matches the official OnConnectedAsync behavior
|
||||
@@ -269,7 +267,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
|
||||
# Critical addition: Notify about finished players in multiplayer games
|
||||
elif (
|
||||
room_user.state == MultiplayerUserState.RESULTS
|
||||
hasattr(room_user.state, 'name') and room_user.state.name == 'RESULTS'
|
||||
and room_user.user_id not in self.state
|
||||
):
|
||||
try:
|
||||
@@ -340,10 +338,15 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
)
|
||||
logger.info(f"[SpectatorHub] {client.user_id} began playing {state.beatmap_id}")
|
||||
|
||||
# Track playing user
|
||||
# Track playing user and maintain online status
|
||||
from app.router.v2.stats import add_playing_user
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
|
||||
asyncio.create_task(add_playing_user(user_id))
|
||||
|
||||
# Critical fix: Maintain metadata online presence during gameplay
|
||||
# This ensures the user appears online while playing
|
||||
await online_status_manager.refresh_user_online_status(user_id, "playing")
|
||||
|
||||
# # 预缓存beatmap文件以加速后续PP计算
|
||||
# await self._preload_beatmap_for_pp_calculation(state.beatmap_id)
|
||||
@@ -357,21 +360,25 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
|
||||
async def SendFrameData(self, client: Client, frame_data: FrameDataBundle) -> None:
|
||||
user_id = int(client.connection_id)
|
||||
state = self.get_or_create_state(client)
|
||||
if not state.score:
|
||||
store = self.get_or_create_state(client)
|
||||
if store.state is None or store.score is None:
|
||||
return
|
||||
state.score.score_info.accuracy = frame_data.header.accuracy
|
||||
state.score.score_info.combo = frame_data.header.combo
|
||||
state.score.score_info.max_combo = frame_data.header.max_combo
|
||||
state.score.score_info.statistics = frame_data.header.statistics
|
||||
state.score.score_info.total_score = frame_data.header.total_score
|
||||
state.score.score_info.mods = frame_data.header.mods
|
||||
state.score.replay_frames.extend(frame_data.frames)
|
||||
|
||||
# Critical fix: Refresh online status during active gameplay
|
||||
# This prevents users from appearing offline while playing
|
||||
from app.service.online_status_manager import online_status_manager
|
||||
await online_status_manager.refresh_user_online_status(user_id, "playing_active")
|
||||
|
||||
header = frame_data.header
|
||||
score_info = store.score.score_info
|
||||
score_info.accuracy = header.accuracy
|
||||
score_info.combo = header.combo
|
||||
score_info.max_combo = header.max_combo
|
||||
score_info.statistics = header.statistics
|
||||
store.score.replay_frames.extend(frame_data.frames)
|
||||
|
||||
await self.broadcast_group_call(
|
||||
self.group_id(user_id),
|
||||
"UserSentFrames",
|
||||
user_id,
|
||||
frame_data,
|
||||
self.group_id(user_id), "UserSentFrames", user_id, frame_data
|
||||
)
|
||||
|
||||
async def EndPlaySession(self, client: Client, state: SpectatorState) -> None:
|
||||
|
||||
2
main.py
2
main.py
@@ -36,6 +36,7 @@ from app.service.osu_rx_statistics import create_rx_statistics
|
||||
from app.service.recalculate import recalculate
|
||||
from app.service.redis_message_system import redis_message_system
|
||||
from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler
|
||||
from app.service.online_status_maintenance import schedule_online_status_maintenance
|
||||
|
||||
# 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic
|
||||
newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini")
|
||||
@@ -85,6 +86,7 @@ async def lifespan(app: FastAPI):
|
||||
await start_database_cleanup_scheduler() # 启动数据库清理调度器
|
||||
redis_message_system.start() # 启动 Redis 消息系统
|
||||
start_stats_scheduler() # 启动统计调度器
|
||||
schedule_online_status_maintenance() # 启动在线状态维护任务
|
||||
load_achievements()
|
||||
# on shutdown
|
||||
yield
|
||||
|
||||
Reference in New Issue
Block a user