diff --git a/app/database/lazer_user.py b/app/database/lazer_user.py index e215dab..3bc9ec3 100644 --- a/app/database/lazer_user.py +++ b/app/database/lazer_user.py @@ -312,7 +312,23 @@ class UserResp(UserBase): ) ).one() redis = get_redis() - u.is_online = await redis.exists(f"metadata:online:{obj.id}") + # 实时验证用户在线状态 + if obj.id is not None: + metadata_key = f"metadata:online:{obj.id}" + is_online_check = await redis.exists(metadata_key) + + # 如果metadata键不存在,立即从在线集合中清理该用户 + if not is_online_check: + try: + from app.service.realtime_online_cleanup import realtime_cleanup + await realtime_cleanup.verify_user_online_status(obj.id) + except Exception as e: + from app.log import logger + logger.warning(f"Failed to verify user {obj.id} online status: {e}") + + u.is_online = bool(is_online_check) + else: + u.is_online = False u.cover_url = ( obj.cover.get( "url", "https://assets.ppy.sh/user-profile-covers/default.jpeg" @@ -402,26 +418,27 @@ class UserResp(UserBase): for ua in await obj.awaitable_attrs.achievement ] if "rank_history" in include: - rank_history = await RankHistoryResp.from_db(session, obj.id, ruleset) - if len(rank_history.data) != 0: - u.rank_history = rank_history + if obj.id is not None: + rank_history = await RankHistoryResp.from_db(session, obj.id, ruleset) + if len(rank_history.data) != 0: + u.rank_history = rank_history - rank_top = ( - await session.exec( - select(RankTop).where( - RankTop.user_id == obj.id, RankTop.mode == ruleset + rank_top = ( + await session.exec( + select(RankTop).where( + RankTop.user_id == obj.id, RankTop.mode == ruleset + ) ) - ) - ).first() - if rank_top: - u.rank_highest = ( - RankHighest( - rank=rank_top.rank, - updated_at=datetime.combine(rank_top.date, datetime.min.time()), + ).first() + if rank_top: + u.rank_highest = ( + RankHighest( + rank=rank_top.rank, + updated_at=datetime.combine(rank_top.date, datetime.min.time()), + ) + if rank_top + else None ) - if rank_top - else None - ) u.favourite_beatmapset_count = ( await session.exec( diff --git a/app/router/notification/message.py b/app/router/notification/message.py index 390db6b..8dfb35f 100644 --- a/app/router/notification/message.py +++ b/app/router/notification/message.py @@ -206,12 +206,19 @@ async def get_message( query = select(ChatMessage).where(ChatMessage.channel_id == channel_id) if since > 0: query = query.where(col(ChatMessage.message_id) > since) - if until is not None: - query = query.where(col(ChatMessage.message_id) < until) - - query = query.order_by(col(ChatMessage.message_id).desc()).limit(limit) - messages = (await session.exec(query)).all() - resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] + # 获取 since 之后的消息,按时间正序 + query = query.order_by(col(ChatMessage.message_id).asc()).limit(limit) + messages = (await session.exec(query)).all() + resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] + else: + # 获取最新消息,先按倒序获取最新的,然后反转为时间正序 + if until is not None: + query = query.where(col(ChatMessage.message_id) < until) + query = query.order_by(col(ChatMessage.message_id).desc()).limit(limit) + messages = (await session.exec(query)).all() + resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] + # 反转为时间正序(最老的在前面) + resp.reverse() return resp diff --git a/app/service/realtime_online_cleanup.py b/app/service/realtime_online_cleanup.py new file mode 100644 index 0000000..3b0e38f --- /dev/null +++ b/app/service/realtime_online_cleanup.py @@ -0,0 +1,196 @@ +""" +实时在线状态清理服务 + +此模块提供实时的在线状态清理功能,确保用户在断开连接后立即从在线列表中移除。 +""" +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timedelta + +from app.dependencies.database import get_redis, get_redis_message +from app.log import logger +from app.router.v2.stats import ( + REDIS_ONLINE_USERS_KEY, + REDIS_PLAYING_USERS_KEY, + _redis_exec, +) + + +class RealtimeOnlineCleanup: + """实时在线状态清理器""" + + def __init__(self): + self._running = False + self._task = None + + async def start(self): + """启动实时清理服务""" + if self._running: + return + + self._running = True + self._task = asyncio.create_task(self._realtime_cleanup_loop()) + logger.info("[RealtimeOnlineCleanup] Started realtime online cleanup service") + + async def stop(self): + """停止实时清理服务""" + if not self._running: + return + + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + logger.info("[RealtimeOnlineCleanup] Stopped realtime online cleanup service") + + async def _realtime_cleanup_loop(self): + """实时清理循环 - 每30秒检查一次""" + while self._running: + try: + # 执行快速清理 + cleaned_count = await self._quick_cleanup_stale_users() + if cleaned_count > 0: + logger.debug(f"[RealtimeOnlineCleanup] Quick cleanup: removed {cleaned_count} stale users") + + # 等待30秒 + await asyncio.sleep(30) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"[RealtimeOnlineCleanup] Error in cleanup loop: {e}") + # 出错时等待30秒再重试 + await asyncio.sleep(30) + + async def _quick_cleanup_stale_users(self) -> int: + """快速清理过期用户,返回清理数量""" + redis_sync = get_redis_message() + redis_async = get_redis() + + total_cleaned = 0 + + try: + # 获取所有在线用户 + online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY) + + # 快速检查:只检查metadata键是否存在 + stale_users = [] + for user_id in online_users: + user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) + metadata_key = f"metadata:online:{user_id_str}" + + # 如果metadata标记不存在,立即标记为过期 + if not await redis_async.exists(metadata_key): + stale_users.append(user_id_str) + + # 立即清理过期用户 + if stale_users: + # 从在线用户集合中移除 + await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, *stale_users) + # 同时从游玩用户集合中移除(如果存在) + await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, *stale_users) + total_cleaned = len(stale_users) + + logger.info(f"[RealtimeOnlineCleanup] Immediately cleaned {total_cleaned} stale users") + + except Exception as e: + logger.error(f"[RealtimeOnlineCleanup] Error in quick cleanup: {e}") + + return total_cleaned + + async def verify_user_online_status(self, user_id: int) -> bool: + """实时验证用户在线状态""" + try: + redis = get_redis() + metadata_key = f"metadata:online:{user_id}" + + # 检查metadata键是否存在 + exists = await redis.exists(metadata_key) + + # 如果不存在,从在线集合中移除该用户 + if not exists: + redis_sync = get_redis_message() + await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, str(user_id)) + await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id)) + logger.debug(f"[RealtimeOnlineCleanup] Verified user {user_id} is offline, removed from sets") + + return bool(exists) + + except Exception as e: + logger.error(f"[RealtimeOnlineCleanup] Error verifying user {user_id} status: {e}") + return False + + async def force_refresh_online_list(self): + """强制刷新整个在线用户列表""" + try: + redis_sync = get_redis_message() + redis_async = get_redis() + + # 获取所有在线用户 + online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY) + playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY) + + # 验证每个用户的在线状态 + valid_online_users = [] + valid_playing_users = [] + + for user_id in online_users: + user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) + metadata_key = f"metadata:online:{user_id_str}" + + if await redis_async.exists(metadata_key): + valid_online_users.append(user_id_str) + + for user_id in playing_users: + user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) + metadata_key = f"metadata:online:{user_id_str}" + + if await redis_async.exists(metadata_key): + valid_playing_users.append(user_id_str) + + # 重建在线用户集合 + if online_users: + await _redis_exec(redis_sync.delete, REDIS_ONLINE_USERS_KEY) + if valid_online_users: + await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, *valid_online_users) + # 设置过期时间 + await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600) + + # 重建游玩用户集合 + if playing_users: + await _redis_exec(redis_sync.delete, REDIS_PLAYING_USERS_KEY) + if valid_playing_users: + await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, *valid_playing_users) + # 设置过期时间 + await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600) + + cleaned_online = len(online_users) - len(valid_online_users) + cleaned_playing = len(playing_users) - len(valid_playing_users) + + if cleaned_online > 0 or cleaned_playing > 0: + logger.info(f"[RealtimeOnlineCleanup] Force refresh: removed {cleaned_online} stale online users, {cleaned_playing} stale playing users") + + return cleaned_online + cleaned_playing + + except Exception as e: + logger.error(f"[RealtimeOnlineCleanup] Error in force refresh: {e}") + return 0 + + +# 全局实例 +realtime_cleanup = RealtimeOnlineCleanup() + + +def start_realtime_cleanup(): + """启动实时清理服务""" + asyncio.create_task(realtime_cleanup.start()) + + +def stop_realtime_cleanup(): + """停止实时清理服务""" + asyncio.create_task(realtime_cleanup.stop()) diff --git a/app/service/redis_message_system.py b/app/service/redis_message_system.py index 51c7df6..46f256f 100644 --- a/app/service/redis_message_system.py +++ b/app/service/redis_message_system.py @@ -358,9 +358,7 @@ class RedisMessageSystem: # 确保消息按ID正序排序(时间顺序) messages.sort(key=lambda x: x.get("message_id", 0)) - # 如果是获取最新消息(since=0),需要保持倒序(最新的在前面) - if since == 0: - messages.reverse() + return messages return messages diff --git a/app/service/stats_cleanup.py b/app/service/stats_cleanup.py index a7c57d0..0106e7c 100644 --- a/app/service/stats_cleanup.py +++ b/app/service/stats_cleanup.py @@ -11,13 +11,14 @@ from app.router.v2.stats import ( ) -async def cleanup_stale_online_users() -> tuple[int, int]: - """清理过期的在线和游玩用户,返回清理的用户数""" +async def cleanup_stale_online_users() -> tuple[int, int, int]: + """清理过期的在线和游玩用户,返回清理的用户数(online_cleaned, playing_cleaned, metadata_cleaned)""" redis_sync = get_redis_message() redis_async = get_redis() online_cleaned = 0 playing_cleaned = 0 + metadata_cleaned = 0 try: # 获取所有在线用户 @@ -71,10 +72,63 @@ async def cleanup_stale_online_users() -> tuple[int, int]: playing_cleaned = len(stale_playing_users) logger.info(f"Cleaned {playing_cleaned} stale playing users") + # 新增:清理过期的metadata在线标记 + # 这个步骤用于清理那些由于异常断开连接而没有被正常清理的metadata键 + metadata_cleaned = 0 + try: + # 查找所有metadata:online:*键 + metadata_keys = [] + cursor = 0 + pattern = "metadata:online:*" + + # 使用SCAN命令遍历所有匹配的键 + while True: + cursor, keys = await redis_async.scan(cursor=cursor, match=pattern, count=100) + metadata_keys.extend(keys) + if cursor == 0: + break + + # 检查这些键是否对应有效的在线用户 + orphaned_metadata_keys = [] + for key in metadata_keys: + if isinstance(key, bytes): + key_str = key.decode() + else: + key_str = key + + # 从键名中提取用户ID + user_id = key_str.replace("metadata:online:", "") + + # 检查用户是否在在线用户集合中 + is_in_online_set = await _redis_exec(redis_sync.sismember, REDIS_ONLINE_USERS_KEY, user_id) + is_in_playing_set = await _redis_exec(redis_sync.sismember, REDIS_PLAYING_USERS_KEY, user_id) + + # 如果用户既不在在线集合也不在游玩集合中,检查TTL + if not is_in_online_set and not is_in_playing_set: + # 检查键的TTL + ttl = await redis_async.ttl(key_str) + # TTL < 0 表示键没有过期时间或已过期 + # 我们只清理那些明确过期或没有设置TTL的键 + if ttl < 0: + # 再次确认键确实存在且没有对应的活跃连接 + key_value = await redis_async.get(key_str) + if key_value: + # 键存在但用户不在任何集合中,且没有有效TTL,可以安全删除 + orphaned_metadata_keys.append(key_str) + + # 清理孤立的metadata键 + if orphaned_metadata_keys: + await redis_async.delete(*orphaned_metadata_keys) + metadata_cleaned = len(orphaned_metadata_keys) + logger.info(f"Cleaned {metadata_cleaned} orphaned metadata:online keys") + + except Exception as e: + logger.error(f"Error cleaning orphaned metadata keys: {e}") + except Exception as e: logger.error(f"Error cleaning stale users: {e}") - return online_cleaned, playing_cleaned + return online_cleaned, playing_cleaned, metadata_cleaned async def refresh_redis_key_expiry() -> None: diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py index cef88d1..ead52ca 100644 --- a/app/service/stats_scheduler.py +++ b/app/service/stats_scheduler.py @@ -134,10 +134,10 @@ class StatsScheduler: """清理循环 - 每10分钟清理一次过期用户""" # 启动时立即执行一次清理 try: - online_cleaned, playing_cleaned = await cleanup_stale_online_users() - if online_cleaned > 0 or playing_cleaned > 0: + online_cleaned, playing_cleaned, metadata_cleaned = await cleanup_stale_online_users() + if online_cleaned > 0 or playing_cleaned > 0 or metadata_cleaned > 0: logger.info( - f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users" + f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users, {metadata_cleaned} orphaned metadata keys" ) await refresh_redis_key_expiry() @@ -153,10 +153,10 @@ class StatsScheduler: try: # 清理过期用户 - online_cleaned, playing_cleaned = await cleanup_stale_online_users() - if online_cleaned > 0 or playing_cleaned > 0: + online_cleaned, playing_cleaned, metadata_cleaned = await cleanup_stale_online_users() + if online_cleaned > 0 or playing_cleaned > 0 or metadata_cleaned > 0: logger.info( - f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users" + f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users, {metadata_cleaned} orphaned metadata keys" ) # 刷新Redis key过期时间 diff --git a/app/signalr/hub/metadata.py b/app/signalr/hub/metadata.py index 757e9c8..8335f0d 100644 --- a/app/signalr/hub/metadata.py +++ b/app/signalr/hub/metadata.py @@ -202,6 +202,11 @@ class MetadataHub(Hub[MetadataClientState]): if store.status is not None and store.status == status_: return store.status = OnlineStatus(status_) + + # 刷新用户在线状态 + from app.service.online_status_manager import online_status_manager + await online_status_manager.refresh_user_online_status(user_id, f"status_{status_.name.lower()}") + tasks = self.broadcast_tasks(user_id, store) tasks.add( self.call_noblock( @@ -219,6 +224,12 @@ class MetadataHub(Hub[MetadataClientState]): user_id = int(client.connection_id) store = self.get_or_create_state(client) store.activity = activity + + # 刷新用户在线状态 + from app.service.online_status_manager import online_status_manager + activity_type = type(activity).__name__ if activity else 'active' + await online_status_manager.refresh_user_online_status(user_id, f"activity_{activity_type}") + tasks = self.broadcast_tasks(user_id, store) tasks.add( self.call_noblock( diff --git a/main.py b/main.py index 2be5c37..22557f2 100644 --- a/main.py +++ b/main.py @@ -37,6 +37,7 @@ from app.service.recalculate import recalculate from app.service.redis_message_system import redis_message_system from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler from app.service.online_status_maintenance import schedule_online_status_maintenance +from app.service.realtime_online_cleanup import start_realtime_cleanup, stop_realtime_cleanup # 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini") @@ -86,6 +87,7 @@ async def lifespan(app: FastAPI): await start_database_cleanup_scheduler() # 启动数据库清理调度器 redis_message_system.start() # 启动 Redis 消息系统 start_stats_scheduler() # 启动统计调度器 + start_realtime_cleanup() # 启动实时在线状态清理服务 schedule_online_status_maintenance() # 启动在线状态维护任务 load_achievements() # on shutdown @@ -93,6 +95,7 @@ async def lifespan(app: FastAPI): stop_scheduler() redis_message_system.stop() # 停止 Redis 消息系统 stop_stats_scheduler() # 停止统计调度器 + stop_realtime_cleanup() # 停止实时在线状态清理服务 await stop_cache_scheduler() # 停止缓存调度器 await stop_database_cleanup_scheduler() # 停止数据库清理调度器 await download_service.stop_health_check() # 停止下载服务健康检查