From 23db81469a77e49a0c1129adc25f534405703309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 06:01:12 +0800 Subject: [PATCH] Update stats.py --- app/router/v2/stats.py | 205 +++++++++++++++++++++++------------------ 1 file changed, 115 insertions(+), 90 deletions(-) diff --git a/app/router/v2/stats.py b/app/router/v2/stats.py index b79dd54..669e924 100644 --- a/app/router/v2/stats.py +++ b/app/router/v2/stats.py @@ -1,45 +1,42 @@ from __future__ import annotations import asyncio -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime +from datetime import datetime, timedelta import json +from typing import Any +from concurrent.futures import ThreadPoolExecutor from app.dependencies.database import get_redis, get_redis_message from app.log import logger from .router import router +from fastapi import APIRouter from pydantic import BaseModel # Redis key constants REDIS_ONLINE_USERS_KEY = "server:online_users" -REDIS_PLAYING_USERS_KEY = "server:playing_users" +REDIS_PLAYING_USERS_KEY = "server:playing_users" REDIS_REGISTERED_USERS_KEY = "server:registered_users" REDIS_ONLINE_HISTORY_KEY = "server:online_history" # 线程池用于同步Redis操作 _executor = ThreadPoolExecutor(max_workers=2) - async def _redis_exec(func, *args, **kwargs): """在线程池中执行同步Redis操作""" loop = asyncio.get_event_loop() return await loop.run_in_executor(_executor, func, *args, **kwargs) - class ServerStats(BaseModel): """服务器统计信息响应模型""" - registered_users: int online_users: int playing_users: int timestamp: datetime - class OnlineHistoryPoint(BaseModel): """在线历史数据点""" - timestamp: datetime online_count: int playing_count: int @@ -47,36 +44,33 @@ class OnlineHistoryPoint(BaseModel): peak_playing: int | None = None # 峰值游玩数(增强数据) total_samples: int | None = None # 采样次数(增强数据) - class OnlineHistoryResponse(BaseModel): """24小时在线历史响应模型""" - history: list[OnlineHistoryPoint] current_stats: ServerStats - @router.get("/stats", response_model=ServerStats, tags=["统计"]) async def get_server_stats() -> ServerStats: """ 获取服务器实时统计信息 - + 返回服务器注册用户数、在线用户数、正在游玩用户数等实时统计信息 """ redis = get_redis() - + try: # 并行获取所有统计数据 registered_count, online_count, playing_count = await asyncio.gather( _get_registered_users_count(redis), _get_online_users_count(redis), - _get_playing_users_count(redis), + _get_playing_users_count(redis) ) - + return ServerStats( registered_users=registered_count, online_users=online_count, playing_users=playing_count, - timestamp=datetime.utcnow(), + timestamp=datetime.utcnow() ) except Exception as e: logger.error(f"Error getting server stats: {e}") @@ -85,87 +79,133 @@ async def get_server_stats() -> ServerStats: registered_users=0, online_users=0, playing_users=0, - timestamp=datetime.utcnow(), + timestamp=datetime.utcnow() ) - @router.get("/stats/history", response_model=OnlineHistoryResponse, tags=["统计"]) async def get_online_history() -> OnlineHistoryResponse: """ 获取最近24小时在线统计历史 - + 返回过去24小时内每小时的在线用户数和游玩用户数统计, 包含当前实时数据作为最新数据点 """ try: # 获取历史数据 - 使用同步Redis客户端 redis_sync = get_redis_message() - history_data = await _redis_exec( - redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1 - ) + history_data = await _redis_exec(redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1) history_points = [] - + # 处理历史数据 for data in history_data: try: point_data = json.loads(data) # 支持新旧格式的历史数据 - history_points.append( - OnlineHistoryPoint( - timestamp=datetime.fromisoformat(point_data["timestamp"]), - online_count=point_data["online_count"], - playing_count=point_data["playing_count"], - peak_online=point_data.get("peak_online"), # 新字段,可能不存在 - peak_playing=point_data.get( - "peak_playing" - ), # 新字段,可能不存在 - total_samples=point_data.get( - "total_samples" - ), # 新字段,可能不存在 - ) - ) + history_points.append(OnlineHistoryPoint( + timestamp=datetime.fromisoformat(point_data["timestamp"]), + online_count=point_data["online_count"], + playing_count=point_data["playing_count"], + peak_online=point_data.get("peak_online"), # 新字段,可能不存在 + peak_playing=point_data.get("peak_playing"), # 新字段,可能不存在 + total_samples=point_data.get("total_samples") # 新字段,可能不存在 + )) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.warning(f"Invalid history data point: {data}, error: {e}") continue - + # 获取当前实时统计信息 current_stats = await get_server_stats() - + # 如果历史数据为空或者最新数据超过15分钟,添加当前数据点 if not history_points or ( - history_points - and ( - current_stats.timestamp - - max(history_points, key=lambda x: x.timestamp).timestamp - ).total_seconds() - > 15 * 60 + history_points and + (current_stats.timestamp - max(history_points, key=lambda x: x.timestamp).timestamp).total_seconds() > 15 * 60 ): - history_points.append( - OnlineHistoryPoint( - timestamp=current_stats.timestamp, - online_count=current_stats.online_users, - playing_count=current_stats.playing_users, - peak_online=current_stats.online_users, # 当前实时数据作为峰值 - peak_playing=current_stats.playing_users, - total_samples=1, - ) - ) - + history_points.append(OnlineHistoryPoint( + timestamp=current_stats.timestamp, + online_count=current_stats.online_users, + playing_count=current_stats.playing_users, + peak_online=current_stats.online_users, # 当前实时数据作为峰值 + peak_playing=current_stats.playing_users, + total_samples=1 + )) + # 按时间排序(最新的在前) history_points.sort(key=lambda x: x.timestamp, reverse=True) - + # 限制到最多48个数据点(24小时) history_points = history_points[:48] - + return OnlineHistoryResponse( - history=history_points, current_stats=current_stats + history=history_points, + current_stats=current_stats ) except Exception as e: logger.error(f"Error getting online history: {e}") # 返回空历史和当前状态 current_stats = await get_server_stats() - return OnlineHistoryResponse(history=[], current_stats=current_stats) + return OnlineHistoryResponse( + history=[], + current_stats=current_stats + ) +@router.get("/stats/debug", tags=["统计"]) +async def get_stats_debug_info(): + """ + 获取统计系统调试信息 + + 用于调试时间对齐和区间统计问题 + """ + try: + from app.service.enhanced_interval_stats import EnhancedIntervalStatsManager + + current_time = datetime.utcnow() + current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() + interval_stats = await EnhancedIntervalStatsManager.get_current_interval_stats() + + # 获取Redis中的实际数据 + redis_sync = get_redis_message() + + online_key = f"server:interval_online_users:{current_interval.interval_key}" + playing_key = f"server:interval_playing_users:{current_interval.interval_key}" + + online_users_raw = await _redis_exec(redis_sync.smembers, online_key) + playing_users_raw = await _redis_exec(redis_sync.smembers, playing_key) + + online_users = [int(uid.decode() if isinstance(uid, bytes) else uid) for uid in online_users_raw] + playing_users = [int(uid.decode() if isinstance(uid, bytes) else uid) for uid in playing_users_raw] + + return { + "current_time": current_time.isoformat(), + "current_interval": { + "start_time": current_interval.start_time.isoformat(), + "end_time": current_interval.end_time.isoformat(), + "key": current_interval.interval_key, + "is_current": current_interval.is_current(), + "minutes_remaining": int((current_interval.end_time - current_time).total_seconds() / 60), + "seconds_remaining": int((current_interval.end_time - current_time).total_seconds()), + "progress_percentage": round((1 - (current_interval.end_time - current_time).total_seconds() / (30 * 60)) * 100, 1) + }, + "interval_statistics": interval_stats.to_dict() if interval_stats else None, + "redis_data": { + "online_users": online_users, + "playing_users": playing_users, + "online_count": len(online_users), + "playing_count": len(playing_users) + }, + "system_status": { + "stats_system": "enhanced_interval_stats", + "data_alignment": "30_minute_boundaries", + "real_time_updates": True, + "auto_24h_fill": True + } + } + except Exception as e: + logger.error(f"Error getting debug info: {e}") + return { + "error": "Failed to retrieve debug information", + "message": str(e) + } async def _get_registered_users_count(redis) -> int: """获取注册用户总数(从缓存)""" @@ -176,7 +216,6 @@ async def _get_registered_users_count(redis) -> int: logger.error(f"Error getting registered users count: {e}") return 0 - async def _get_online_users_count(redis) -> int: """获取当前在线用户数""" try: @@ -186,7 +225,6 @@ async def _get_online_users_count(redis) -> int: logger.error(f"Error getting online users count: {e}") return 0 - async def _get_playing_users_count(redis) -> int: """获取当前游玩用户数""" try: @@ -196,16 +234,14 @@ async def _get_playing_users_count(redis) -> int: logger.error(f"Error getting playing users count: {e}") return 0 - # 统计更新功能 async def update_registered_users_count() -> None: """更新注册用户数缓存""" - from app.const import BANCHOBOT_ID - from app.database import User from app.dependencies.database import with_db - - from sqlmodel import func, select - + from app.database import User + from app.const import BANCHOBOT_ID + from sqlmodel import select, func + redis = get_redis() try: async with with_db() as db: @@ -219,7 +255,6 @@ async def update_registered_users_count() -> None: except Exception as e: logger.error(f"Error updating registered users count: {e}") - async def add_online_user(user_id: int) -> None: """添加在线用户""" redis_sync = get_redis_message() @@ -231,16 +266,14 @@ async def add_online_user(user_id: int) -> None: if ttl <= 0: # -1表示永不过期,-2表示不存在,0表示已过期 await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600) # 3小时过期 logger.debug(f"Added online user {user_id}") - + # 立即更新当前区间统计 from app.service.enhanced_interval_stats import update_user_activity_in_interval - asyncio.create_task(update_user_activity_in_interval(user_id, is_playing=False)) - + except Exception as e: logger.error(f"Error adding online user {user_id}: {e}") - async def remove_online_user(user_id: int) -> None: """移除在线用户""" redis_sync = get_redis_message() @@ -250,7 +283,6 @@ async def remove_online_user(user_id: int) -> None: except Exception as e: logger.error(f"Error removing online user {user_id}: {e}") - async def add_playing_user(user_id: int) -> None: """添加游玩用户""" redis_sync = get_redis_message() @@ -262,16 +294,14 @@ async def add_playing_user(user_id: int) -> None: if ttl <= 0: # -1表示永不过期,-2表示不存在,0表示已过期 await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600) # 3小时过期 logger.debug(f"Added playing user {user_id}") - + # 立即更新当前区间统计 from app.service.enhanced_interval_stats import update_user_activity_in_interval - asyncio.create_task(update_user_activity_in_interval(user_id, is_playing=True)) - + except Exception as e: logger.error(f"Error adding playing user {user_id}: {e}") - async def remove_playing_user(user_id: int) -> None: """移除游玩用户""" redis_sync = get_redis_message() @@ -280,7 +310,6 @@ async def remove_playing_user(user_id: int) -> None: except Exception as e: logger.error(f"Error removing playing user {user_id}: {e}") - async def record_hourly_stats() -> None: """记录统计数据 - 简化版本,主要作为fallback使用""" redis_sync = get_redis_message() @@ -288,10 +317,10 @@ async def record_hourly_stats() -> None: try: # 先确保Redis连接正常 await redis_async.ping() - + online_count = await _get_online_users_count(redis_async) playing_count = await _get_playing_users_count(redis_async) - + current_time = datetime.utcnow() history_point = { "timestamp": current_time.isoformat(), @@ -299,20 +328,16 @@ async def record_hourly_stats() -> None: "playing_count": playing_count, "peak_online": online_count, "peak_playing": playing_count, - "total_samples": 1, + "total_samples": 1 } - + # 添加到历史记录 - await _redis_exec( - redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point) - ) + await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) # 只保留48个数据点(24小时,每30分钟一个点) await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) # 设置过期时间为26小时,确保有足够缓冲 await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - - logger.info( - f"Recorded fallback stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}" - ) + + logger.info(f"Recorded fallback stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}") except Exception as e: logger.error(f"Error recording fallback stats: {e}")