From 56ae478264c2825aa8a5118fab523435db68ae69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 05:02:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=95=B0=E6=8D=AE=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/router/v2/stats.py | 86 ++++++++++++++++--- app/service/interval_stats.py | 148 +++++++++++++++++++++++++++++++++ app/service/stats_cleanup.py | 81 ++++++++++++++++++ app/service/stats_scheduler.py | 100 ++++++++++++++++++++-- 4 files changed, 397 insertions(+), 18 deletions(-) create mode 100644 app/service/interval_stats.py create mode 100644 app/service/stats_cleanup.py diff --git a/app/router/v2/stats.py b/app/router/v2/stats.py index 7c2a774..63927c5 100644 --- a/app/router/v2/stats.py +++ b/app/router/v2/stats.py @@ -84,7 +84,8 @@ async def get_online_history() -> OnlineHistoryResponse: """ 获取最近24小时在线统计历史 - 返回过去24小时内每小时的在线用户数和游玩用户数统计 + 返回过去24小时内每小时的在线用户数和游玩用户数统计, + 包含当前实时数据作为最新数据点 """ redis = get_redis() @@ -107,11 +108,32 @@ async def get_online_history() -> OnlineHistoryResponse: logger.warning(f"Invalid history data point: {data}, error: {e}") continue + # 获取当前实时统计信息 + current_stats = await get_server_stats() + + # 将当前实时数据作为最新的数据点添加到历史中 + current_point = OnlineHistoryPoint( + timestamp=current_stats.timestamp, + online_count=current_stats.online_users, + playing_count=current_stats.playing_users + ) + + # 检查是否需要添加当前数据点 + # 如果最新的历史数据超过15分钟,则添加当前数据点 + should_add_current = True + if history_points: + latest_history = max(history_points, key=lambda x: x.timestamp) + time_diff = (current_stats.timestamp - latest_history.timestamp).total_seconds() + should_add_current = time_diff > 15 * 60 # 15分钟 + + if should_add_current: + history_points.append(current_point) + # 按时间排序(最新的在前) history_points.sort(key=lambda x: x.timestamp, reverse=True) - # 获取当前统计信息 - current_stats = await get_server_stats() + # 限制到最多48个数据点(24小时) + history_points = history_points[:48] return OnlineHistoryResponse( history=history_points, @@ -126,6 +148,28 @@ async def get_online_history() -> OnlineHistoryResponse: current_stats=current_stats ) +@router.get("/stats/realtime", tags=["统计"]) +async def get_realtime_stats(): + """ + 获取实时统计数据 + + 返回包含当前区间统计的增强实时数据 + """ + try: + from app.service.interval_stats import get_enhanced_current_stats + return await get_enhanced_current_stats() + except Exception as e: + logger.error(f"Error getting realtime stats: {e}") + # 回退到基础统计 + stats = await get_server_stats() + return { + "registered_users": stats.registered_users, + "online_users": stats.online_users, + "playing_users": stats.playing_users, + "timestamp": stats.timestamp.isoformat(), + "interval_data": None + } + async def _get_registered_users_count(redis) -> int: """获取注册用户总数(从缓存)""" try: @@ -180,7 +224,16 @@ async def add_online_user(user_id: int) -> None: redis_async = get_redis() try: await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, str(user_id)) - await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3600) # 1小时过期 + # 检查key是否已有过期时间,如果没有则设置3小时过期 + ttl = await redis_async.ttl(REDIS_ONLINE_USERS_KEY) + if ttl <= 0: # -1表示永不过期,-2表示不存在,0表示已过期 + await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600) # 3小时过期 + logger.debug(f"Added online user {user_id}") + + # 立即更新当前区间统计 + from app.service.interval_stats import update_user_activity_stats + asyncio.create_task(update_user_activity_stats()) + except Exception as e: logger.error(f"Error adding online user {user_id}: {e}") @@ -199,7 +252,16 @@ async def add_playing_user(user_id: int) -> None: redis_async = get_redis() try: await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, str(user_id)) - await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3600) # 1小时过期 + # 检查key是否已有过期时间,如果没有则设置3小时过期 + ttl = await redis_async.ttl(REDIS_PLAYING_USERS_KEY) + if ttl <= 0: # -1表示永不过期,-2表示不存在,0表示已过期 + await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600) # 3小时过期 + logger.debug(f"Added playing user {user_id}") + + # 立即更新当前区间统计 + from app.service.interval_stats import update_user_activity_stats + asyncio.create_task(update_user_activity_stats()) + except Exception as e: logger.error(f"Error adding playing user {user_id}: {e}") @@ -216,22 +278,26 @@ async def record_hourly_stats() -> None: redis_sync = get_redis_message() redis_async = get_redis() try: + # 先确保Redis连接正常 + await redis_async.ping() + online_count = await _get_online_users_count(redis_async) playing_count = await _get_playing_users_count(redis_async) + current_time = datetime.utcnow() history_point = { - "timestamp": datetime.utcnow().isoformat(), + "timestamp": current_time.isoformat(), "online_count": online_count, "playing_count": playing_count } # 添加到历史记录 await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) - # 只保留48个数据点 + # 只保留48个数据点(24小时,每30分钟一个点) await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) - # 设置过期时间为25小时 - await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 25 * 3600) + # 设置过期时间为26小时,确保有足够缓冲 + await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - logger.debug(f"Recorded hourly stats: online={online_count}, playing={playing_count}") + logger.info(f"Recorded hourly stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}") except Exception as e: logger.error(f"Error recording hourly stats: {e}") diff --git a/app/service/interval_stats.py b/app/service/interval_stats.py new file mode 100644 index 0000000..30c23ba --- /dev/null +++ b/app/service/interval_stats.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import json +from datetime import datetime, timedelta +from typing import Dict, Optional + +from app.dependencies.database import get_redis, get_redis_message +from app.log import logger +from app.router.v2.stats import ( + REDIS_ONLINE_HISTORY_KEY, + _get_online_users_count, + _get_playing_users_count, + _redis_exec +) + +# Redis key for current interval stats +CURRENT_INTERVAL_STATS_KEY = "server:current_interval_stats" + +class IntervalStatsManager: + """区间统计管理器 - 管理当前30分钟区间的实时统计""" + + @staticmethod + def _get_current_interval_key() -> str: + """获取当前30分钟区间的唯一标识""" + now = datetime.utcnow() + # 将时间对齐到30分钟区间 + interval_start = now.replace(minute=(now.minute // 30) * 30, second=0, microsecond=0) + return f"{CURRENT_INTERVAL_STATS_KEY}:{interval_start.strftime('%Y%m%d_%H%M')}" + + @staticmethod + async def update_current_interval() -> None: + """更新当前区间的统计数据""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + # 获取当前在线和游玩用户数 + online_count = await _get_online_users_count(redis_async) + playing_count = await _get_playing_users_count(redis_async) + + current_time = datetime.utcnow() + interval_key = IntervalStatsManager._get_current_interval_key() + + # 准备区间统计数据 + interval_stats = { + "timestamp": current_time.isoformat(), + "online_count": online_count, + "playing_count": playing_count, + "last_updated": current_time.isoformat() + } + + # 存储当前区间统计 + await _redis_exec(redis_sync.set, interval_key, json.dumps(interval_stats)) + await redis_async.expire(interval_key, 35 * 60) # 35分钟过期,确保覆盖整个区间 + + logger.debug(f"Updated current interval stats: online={online_count}, playing={playing_count}") + + except Exception as e: + logger.error(f"Error updating current interval stats: {e}") + + @staticmethod + async def get_current_interval_stats() -> Optional[Dict]: + """获取当前区间的统计数据""" + redis_sync = get_redis_message() + + try: + interval_key = IntervalStatsManager._get_current_interval_key() + stats_data = await _redis_exec(redis_sync.get, interval_key) + + if stats_data: + return json.loads(stats_data) + return None + + except Exception as e: + logger.error(f"Error getting current interval stats: {e}") + return None + + @staticmethod + async def finalize_current_interval() -> bool: + """完成当前区间统计,将其添加到历史记录中""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + # 获取当前区间的统计 + current_stats = await IntervalStatsManager.get_current_interval_stats() + if not current_stats: + # 如果没有当前区间数据,使用实时数据 + online_count = await _get_online_users_count(redis_async) + playing_count = await _get_playing_users_count(redis_async) + current_time = datetime.utcnow() + + current_stats = { + "timestamp": current_time.isoformat(), + "online_count": online_count, + "playing_count": playing_count + } + + # 调整时间戳到区间结束时间 + now = datetime.utcnow() + interval_end = now.replace(minute=(now.minute // 30) * 30, second=0, microsecond=0) + if now.minute % 30 != 0 or now.second != 0: + interval_end += timedelta(minutes=30) + + history_point = { + "timestamp": interval_end.isoformat(), + "online_count": current_stats["online_count"], + "playing_count": current_stats["playing_count"] + } + + # 添加到历史记录 + await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) + # 只保留48个数据点(24小时,每30分钟一个点) + await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) + # 设置过期时间为26小时,确保有足够缓冲 + await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) + + logger.info(f"Finalized interval stats: online={current_stats['online_count']}, playing={current_stats['playing_count']} at {interval_end.strftime('%H:%M:%S')}") + return True + + except Exception as e: + logger.error(f"Error finalizing current interval: {e}") + return False + +# 便捷函数 +async def update_user_activity_stats() -> None: + """在用户活动时更新统计(登录、开始游玩等)""" + await IntervalStatsManager.update_current_interval() + +async def get_enhanced_current_stats() -> Dict: + """获取增强的当前统计,包含区间数据""" + from app.router.v2.stats import get_server_stats + + # 获取基础统计 + current_stats = await get_server_stats() + + # 获取区间统计 + interval_stats = await IntervalStatsManager.get_current_interval_stats() + + result = { + "registered_users": current_stats.registered_users, + "online_users": current_stats.online_users, + "playing_users": current_stats.playing_users, + "timestamp": current_stats.timestamp.isoformat(), + "interval_data": interval_stats + } + + return result diff --git a/app/service/stats_cleanup.py b/app/service/stats_cleanup.py new file mode 100644 index 0000000..f87a03a --- /dev/null +++ b/app/service/stats_cleanup.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from app.dependencies.database import get_redis, get_redis_message +from app.log import logger +from app.router.v2.stats import REDIS_ONLINE_USERS_KEY, REDIS_PLAYING_USERS_KEY, _redis_exec + + +async def cleanup_stale_online_users() -> tuple[int, int]: + """清理过期的在线和游玩用户,返回清理的用户数""" + redis_sync = get_redis_message() + redis_async = get_redis() + + online_cleaned = 0 + playing_cleaned = 0 + + try: + # 获取所有在线用户 + online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY) + playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY) + + # 检查在线用户的最后活动时间 + current_time = datetime.utcnow() + stale_threshold = current_time - timedelta(hours=2) # 2小时无活动视为过期 + + # 对于在线用户,我们检查metadata在线标记 + stale_online_users = [] + for user_id in online_users: + user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) + metadata_key = f"metadata:online:{user_id_str}" + + # 如果metadata标记不存在,说明用户已经离线 + if not await redis_async.exists(metadata_key): + stale_online_users.append(user_id_str) + + # 清理过期的在线用户 + if stale_online_users: + await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, *stale_online_users) + online_cleaned = len(stale_online_users) + logger.info(f"Cleaned {online_cleaned} stale online users") + + # 对于游玩用户,我们也检查对应的spectator状态 + stale_playing_users = [] + for user_id in playing_users: + user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id) + + # 如果用户不在在线用户列表中,说明已经离线,也应该从游玩列表中移除 + if user_id_str in stale_online_users or user_id_str not in [ + u.decode() if isinstance(u, bytes) else str(u) for u in online_users + ]: + stale_playing_users.append(user_id_str) + + # 清理过期的游玩用户 + if stale_playing_users: + await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, *stale_playing_users) + playing_cleaned = len(stale_playing_users) + logger.info(f"Cleaned {playing_cleaned} stale playing users") + + except Exception as e: + logger.error(f"Error cleaning stale users: {e}") + + return online_cleaned, playing_cleaned + + +async def refresh_redis_key_expiry() -> None: + """刷新Redis键的过期时间,防止数据丢失""" + redis_async = get_redis() + + try: + # 刷新在线用户key的过期时间 + if await redis_async.exists(REDIS_ONLINE_USERS_KEY): + await redis_async.expire(REDIS_ONLINE_USERS_KEY, 6 * 3600) # 6小时 + + # 刷新游玩用户key的过期时间 + if await redis_async.exists(REDIS_PLAYING_USERS_KEY): + await redis_async.expire(REDIS_PLAYING_USERS_KEY, 6 * 3600) # 6小时 + + logger.debug("Refreshed Redis key expiry times") + + except Exception as e: + logger.error(f"Error refreshing Redis key expiry: {e}") diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py index fef3081..db32f47 100644 --- a/app/service/stats_scheduler.py +++ b/app/service/stats_scheduler.py @@ -1,10 +1,12 @@ from __future__ import annotations import asyncio -from datetime import datetime +from datetime import datetime, timedelta from app.log import logger from app.router.v2.stats import record_hourly_stats, update_registered_users_count +from app.service.stats_cleanup import cleanup_stale_online_users, refresh_redis_key_expiry +from app.service.interval_stats import IntervalStatsManager, update_user_activity_stats class StatsScheduler: @@ -14,6 +16,7 @@ class StatsScheduler: self._running = False self._stats_task: asyncio.Task | None = None self._registered_task: asyncio.Task | None = None + self._cleanup_task: asyncio.Task | None = None def start(self) -> None: """启动调度器""" @@ -23,6 +26,7 @@ class StatsScheduler: self._running = True self._stats_task = asyncio.create_task(self._stats_loop()) self._registered_task = asyncio.create_task(self._registered_users_loop()) + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) logger.info("Stats scheduler started") def stop(self) -> None: @@ -36,32 +40,112 @@ class StatsScheduler: self._stats_task.cancel() if self._registered_task: self._registered_task.cancel() + if self._cleanup_task: + self._cleanup_task.cancel() logger.info("Stats scheduler stopped") async def _stats_loop(self) -> None: """统计数据记录循环 - 每30分钟记录一次""" + # 启动时立即记录一次统计数据 + try: + await IntervalStatsManager.update_current_interval() + logger.info("Initial interval statistics updated on startup") + except Exception as e: + logger.error(f"Error updating initial interval stats: {e}") + while self._running: try: - await record_hourly_stats() - logger.debug("Recorded hourly statistics") + # 计算下次记录时间(下个30分钟整点) + now = datetime.utcnow() + minutes_until_next = 30 - (now.minute % 30) + next_record_time = now.replace(second=0, microsecond=0) + timedelta(minutes=minutes_until_next) + + # 计算需要等待的秒数 + sleep_seconds = (next_record_time - now).total_seconds() + + # 确保至少等待30分钟,但不超过31分钟(防止时间漂移) + sleep_seconds = max(min(sleep_seconds, 31 * 60), 30 * 60) + + await asyncio.sleep(sleep_seconds) + + if not self._running: + break + + # 完成当前区间并记录到历史 + success = await IntervalStatsManager.finalize_current_interval() + if success: + logger.info(f"Finalized interval statistics at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}") + else: + # 如果区间完成失败,使用原有方式记录 + await record_hourly_stats() + logger.info(f"Recorded hourly statistics (fallback) at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}") + + # 开始新的区间统计 + await IntervalStatsManager.update_current_interval() + except Exception as e: logger.error(f"Error in stats loop: {e}") - - # 等待30分钟 - await asyncio.sleep(30 * 60) + # 出错时等待5分钟再重试 + await asyncio.sleep(5 * 60) async def _registered_users_loop(self) -> None: """注册用户数更新循环 - 每5分钟更新一次""" + # 启动时立即更新一次注册用户数 + try: + await update_registered_users_count() + logger.info("Initial registered users count updated on startup") + except Exception as e: + logger.error(f"Error updating initial registered users count: {e}") + while self._running: + # 等待5分钟 + await asyncio.sleep(5 * 60) + + if not self._running: + break + try: await update_registered_users_count() logger.debug("Updated registered users count") except Exception as e: logger.error(f"Error in registered users loop: {e}") + + async def _cleanup_loop(self) -> None: + """清理循环 - 每10分钟清理一次过期用户""" + # 启动时立即执行一次清理 + try: + online_cleaned, playing_cleaned = await cleanup_stale_online_users() + if online_cleaned > 0 or playing_cleaned > 0: + logger.info(f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users") - # 等待5分钟 - await asyncio.sleep(5 * 60) + await refresh_redis_key_expiry() + except Exception as e: + logger.error(f"Error in initial cleanup: {e}") + + while self._running: + # 等待10分钟 + await asyncio.sleep(10 * 60) + + if not self._running: + break + + try: + # 清理过期用户 + online_cleaned, playing_cleaned = await cleanup_stale_online_users() + if online_cleaned > 0 or playing_cleaned > 0: + logger.info(f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users") + + # 刷新Redis key过期时间 + await refresh_redis_key_expiry() + + # 更新当前区间统计(每10分钟更新一次以保持数据新鲜) + await IntervalStatsManager.update_current_interval() + + except Exception as e: + logger.error(f"Error in cleanup loop: {e}") + # 出错时等待2分钟再重试 + await asyncio.sleep(2 * 60) # 全局调度器实例