Update stats.py

This commit is contained in:
咕谷酱
2025-08-22 06:01:12 +08:00
parent ce465aa049
commit 23db81469a

View File

@@ -1,15 +1,17 @@
from __future__ import annotations
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from datetime import datetime, timedelta
import json
from typing import Any
from concurrent.futures import ThreadPoolExecutor
from app.dependencies.database import get_redis, get_redis_message
from app.log import logger
from .router import router
from fastapi import APIRouter
from pydantic import BaseModel
# Redis key constants
@@ -21,25 +23,20 @@ REDIS_ONLINE_HISTORY_KEY = "server:online_history"
# 线程池用于同步Redis操作
_executor = ThreadPoolExecutor(max_workers=2)
async def _redis_exec(func, *args, **kwargs):
"""在线程池中执行同步Redis操作"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_executor, func, *args, **kwargs)
class ServerStats(BaseModel):
"""服务器统计信息响应模型"""
registered_users: int
online_users: int
playing_users: int
timestamp: datetime
class OnlineHistoryPoint(BaseModel):
"""在线历史数据点"""
timestamp: datetime
online_count: int
playing_count: int
@@ -47,14 +44,11 @@ class OnlineHistoryPoint(BaseModel):
peak_playing: int | None = None # 峰值游玩数(增强数据)
total_samples: int | None = None # 采样次数(增强数据)
class OnlineHistoryResponse(BaseModel):
"""24小时在线历史响应模型"""
history: list[OnlineHistoryPoint]
current_stats: ServerStats
@router.get("/stats", response_model=ServerStats, tags=["统计"])
async def get_server_stats() -> ServerStats:
"""
@@ -69,14 +63,14 @@ async def get_server_stats() -> ServerStats:
registered_count, online_count, playing_count = await asyncio.gather(
_get_registered_users_count(redis),
_get_online_users_count(redis),
_get_playing_users_count(redis),
_get_playing_users_count(redis)
)
return ServerStats(
registered_users=registered_count,
online_users=online_count,
playing_users=playing_count,
timestamp=datetime.utcnow(),
timestamp=datetime.utcnow()
)
except Exception as e:
logger.error(f"Error getting server stats: {e}")
@@ -85,10 +79,9 @@ async def get_server_stats() -> ServerStats:
registered_users=0,
online_users=0,
playing_users=0,
timestamp=datetime.utcnow(),
timestamp=datetime.utcnow()
)
@router.get("/stats/history", response_model=OnlineHistoryResponse, tags=["统计"])
async def get_online_history() -> OnlineHistoryResponse:
"""
@@ -100,9 +93,7 @@ async def get_online_history() -> OnlineHistoryResponse:
try:
# 获取历史数据 - 使用同步Redis客户端
redis_sync = get_redis_message()
history_data = await _redis_exec(
redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1
)
history_data = await _redis_exec(redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1)
history_points = []
# 处理历史数据
@@ -110,20 +101,14 @@ async def get_online_history() -> OnlineHistoryResponse:
try:
point_data = json.loads(data)
# 支持新旧格式的历史数据
history_points.append(
OnlineHistoryPoint(
timestamp=datetime.fromisoformat(point_data["timestamp"]),
online_count=point_data["online_count"],
playing_count=point_data["playing_count"],
peak_online=point_data.get("peak_online"), # 新字段,可能不存在
peak_playing=point_data.get(
"peak_playing"
), # 新字段,可能不存在
total_samples=point_data.get(
"total_samples"
), # 新字段,可能不存在
)
)
history_points.append(OnlineHistoryPoint(
timestamp=datetime.fromisoformat(point_data["timestamp"]),
online_count=point_data["online_count"],
playing_count=point_data["playing_count"],
peak_online=point_data.get("peak_online"), # 新字段,可能不存在
peak_playing=point_data.get("peak_playing"), # 新字段,可能不存在
total_samples=point_data.get("total_samples") # 新字段,可能不存在
))
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Invalid history data point: {data}, error: {e}")
continue
@@ -133,23 +118,17 @@ async def get_online_history() -> OnlineHistoryResponse:
# 如果历史数据为空或者最新数据超过15分钟添加当前数据点
if not history_points or (
history_points
and (
current_stats.timestamp
- max(history_points, key=lambda x: x.timestamp).timestamp
).total_seconds()
> 15 * 60
history_points and
(current_stats.timestamp - max(history_points, key=lambda x: x.timestamp).timestamp).total_seconds() > 15 * 60
):
history_points.append(
OnlineHistoryPoint(
timestamp=current_stats.timestamp,
online_count=current_stats.online_users,
playing_count=current_stats.playing_users,
peak_online=current_stats.online_users, # 当前实时数据作为峰值
peak_playing=current_stats.playing_users,
total_samples=1,
)
)
history_points.append(OnlineHistoryPoint(
timestamp=current_stats.timestamp,
online_count=current_stats.online_users,
playing_count=current_stats.playing_users,
peak_online=current_stats.online_users, # 当前实时数据作为峰值
peak_playing=current_stats.playing_users,
total_samples=1
))
# 按时间排序(最新的在前)
history_points.sort(key=lambda x: x.timestamp, reverse=True)
@@ -158,14 +137,75 @@ async def get_online_history() -> OnlineHistoryResponse:
history_points = history_points[:48]
return OnlineHistoryResponse(
history=history_points, current_stats=current_stats
history=history_points,
current_stats=current_stats
)
except Exception as e:
logger.error(f"Error getting online history: {e}")
# 返回空历史和当前状态
current_stats = await get_server_stats()
return OnlineHistoryResponse(history=[], current_stats=current_stats)
return OnlineHistoryResponse(
history=[],
current_stats=current_stats
)
@router.get("/stats/debug", tags=["统计"])
async def get_stats_debug_info():
"""
获取统计系统调试信息
用于调试时间对齐和区间统计问题
"""
try:
from app.service.enhanced_interval_stats import EnhancedIntervalStatsManager
current_time = datetime.utcnow()
current_interval = await EnhancedIntervalStatsManager.get_current_interval_info()
interval_stats = await EnhancedIntervalStatsManager.get_current_interval_stats()
# 获取Redis中的实际数据
redis_sync = get_redis_message()
online_key = f"server:interval_online_users:{current_interval.interval_key}"
playing_key = f"server:interval_playing_users:{current_interval.interval_key}"
online_users_raw = await _redis_exec(redis_sync.smembers, online_key)
playing_users_raw = await _redis_exec(redis_sync.smembers, playing_key)
online_users = [int(uid.decode() if isinstance(uid, bytes) else uid) for uid in online_users_raw]
playing_users = [int(uid.decode() if isinstance(uid, bytes) else uid) for uid in playing_users_raw]
return {
"current_time": current_time.isoformat(),
"current_interval": {
"start_time": current_interval.start_time.isoformat(),
"end_time": current_interval.end_time.isoformat(),
"key": current_interval.interval_key,
"is_current": current_interval.is_current(),
"minutes_remaining": int((current_interval.end_time - current_time).total_seconds() / 60),
"seconds_remaining": int((current_interval.end_time - current_time).total_seconds()),
"progress_percentage": round((1 - (current_interval.end_time - current_time).total_seconds() / (30 * 60)) * 100, 1)
},
"interval_statistics": interval_stats.to_dict() if interval_stats else None,
"redis_data": {
"online_users": online_users,
"playing_users": playing_users,
"online_count": len(online_users),
"playing_count": len(playing_users)
},
"system_status": {
"stats_system": "enhanced_interval_stats",
"data_alignment": "30_minute_boundaries",
"real_time_updates": True,
"auto_24h_fill": True
}
}
except Exception as e:
logger.error(f"Error getting debug info: {e}")
return {
"error": "Failed to retrieve debug information",
"message": str(e)
}
async def _get_registered_users_count(redis) -> int:
"""获取注册用户总数(从缓存)"""
@@ -176,7 +216,6 @@ async def _get_registered_users_count(redis) -> int:
logger.error(f"Error getting registered users count: {e}")
return 0
async def _get_online_users_count(redis) -> int:
"""获取当前在线用户数"""
try:
@@ -186,7 +225,6 @@ async def _get_online_users_count(redis) -> int:
logger.error(f"Error getting online users count: {e}")
return 0
async def _get_playing_users_count(redis) -> int:
"""获取当前游玩用户数"""
try:
@@ -196,15 +234,13 @@ async def _get_playing_users_count(redis) -> int:
logger.error(f"Error getting playing users count: {e}")
return 0
# 统计更新功能
async def update_registered_users_count() -> None:
"""更新注册用户数缓存"""
from app.const import BANCHOBOT_ID
from app.database import User
from app.dependencies.database import with_db
from sqlmodel import func, select
from app.database import User
from app.const import BANCHOBOT_ID
from sqlmodel import select, func
redis = get_redis()
try:
@@ -219,7 +255,6 @@ async def update_registered_users_count() -> None:
except Exception as e:
logger.error(f"Error updating registered users count: {e}")
async def add_online_user(user_id: int) -> None:
"""添加在线用户"""
redis_sync = get_redis_message()
@@ -234,13 +269,11 @@ async def add_online_user(user_id: int) -> None:
# 立即更新当前区间统计
from app.service.enhanced_interval_stats import update_user_activity_in_interval
asyncio.create_task(update_user_activity_in_interval(user_id, is_playing=False))
except Exception as e:
logger.error(f"Error adding online user {user_id}: {e}")
async def remove_online_user(user_id: int) -> None:
"""移除在线用户"""
redis_sync = get_redis_message()
@@ -250,7 +283,6 @@ async def remove_online_user(user_id: int) -> None:
except Exception as e:
logger.error(f"Error removing online user {user_id}: {e}")
async def add_playing_user(user_id: int) -> None:
"""添加游玩用户"""
redis_sync = get_redis_message()
@@ -265,13 +297,11 @@ async def add_playing_user(user_id: int) -> None:
# 立即更新当前区间统计
from app.service.enhanced_interval_stats import update_user_activity_in_interval
asyncio.create_task(update_user_activity_in_interval(user_id, is_playing=True))
except Exception as e:
logger.error(f"Error adding playing user {user_id}: {e}")
async def remove_playing_user(user_id: int) -> None:
"""移除游玩用户"""
redis_sync = get_redis_message()
@@ -280,7 +310,6 @@ async def remove_playing_user(user_id: int) -> None:
except Exception as e:
logger.error(f"Error removing playing user {user_id}: {e}")
async def record_hourly_stats() -> None:
"""记录统计数据 - 简化版本主要作为fallback使用"""
redis_sync = get_redis_message()
@@ -299,20 +328,16 @@ async def record_hourly_stats() -> None:
"playing_count": playing_count,
"peak_online": online_count,
"peak_playing": playing_count,
"total_samples": 1,
"total_samples": 1
}
# 添加到历史记录
await _redis_exec(
redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)
)
await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point))
# 只保留48个数据点24小时每30分钟一个点
await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47)
# 设置过期时间为26小时确保有足够缓冲
await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600)
logger.info(
f"Recorded fallback stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}"
)
logger.info(f"Recorded fallback stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}")
except Exception as e:
logger.error(f"Error recording fallback stats: {e}")