修复数据统计问题

This commit is contained in:
咕谷酱
2025-08-22 05:02:24 +08:00
parent fabfbd9f7f
commit 56ae478264
4 changed files with 397 additions and 18 deletions

View File

@@ -84,7 +84,8 @@ async def get_online_history() -> OnlineHistoryResponse:
"""
获取最近24小时在线统计历史
返回过去24小时内每小时的在线用户数和游玩用户数统计
返回过去24小时内每小时的在线用户数和游玩用户数统计
包含当前实时数据作为最新数据点
"""
redis = get_redis()
@@ -107,11 +108,32 @@ async def get_online_history() -> OnlineHistoryResponse:
logger.warning(f"Invalid history data point: {data}, error: {e}")
continue
# 获取当前实时统计信息
current_stats = await get_server_stats()
# 将当前实时数据作为最新的数据点添加到历史中
current_point = OnlineHistoryPoint(
timestamp=current_stats.timestamp,
online_count=current_stats.online_users,
playing_count=current_stats.playing_users
)
# 检查是否需要添加当前数据点
# 如果最新的历史数据超过15分钟则添加当前数据点
should_add_current = True
if history_points:
latest_history = max(history_points, key=lambda x: x.timestamp)
time_diff = (current_stats.timestamp - latest_history.timestamp).total_seconds()
should_add_current = time_diff > 15 * 60 # 15分钟
if should_add_current:
history_points.append(current_point)
# 按时间排序(最新的在前)
history_points.sort(key=lambda x: x.timestamp, reverse=True)
# 获取当前统计信息
current_stats = await get_server_stats()
# 限制到最多48个数据点24小时
history_points = history_points[:48]
return OnlineHistoryResponse(
history=history_points,
@@ -126,6 +148,28 @@ async def get_online_history() -> OnlineHistoryResponse:
current_stats=current_stats
)
@router.get("/stats/realtime", tags=["统计"])
async def get_realtime_stats():
"""
获取实时统计数据
返回包含当前区间统计的增强实时数据
"""
try:
from app.service.interval_stats import get_enhanced_current_stats
return await get_enhanced_current_stats()
except Exception as e:
logger.error(f"Error getting realtime stats: {e}")
# 回退到基础统计
stats = await get_server_stats()
return {
"registered_users": stats.registered_users,
"online_users": stats.online_users,
"playing_users": stats.playing_users,
"timestamp": stats.timestamp.isoformat(),
"interval_data": None
}
async def _get_registered_users_count(redis) -> int:
"""获取注册用户总数(从缓存)"""
try:
@@ -180,7 +224,16 @@ async def add_online_user(user_id: int) -> None:
redis_async = get_redis()
try:
await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, str(user_id))
await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3600) # 1小时过期
# 检查key是否已有过期时间如果没有则设置3小时过期
ttl = await redis_async.ttl(REDIS_ONLINE_USERS_KEY)
if ttl <= 0: # -1表示永不过期-2表示不存在0表示已过期
await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3 * 3600) # 3小时过期
logger.debug(f"Added online user {user_id}")
# 立即更新当前区间统计
from app.service.interval_stats import update_user_activity_stats
asyncio.create_task(update_user_activity_stats())
except Exception as e:
logger.error(f"Error adding online user {user_id}: {e}")
@@ -199,7 +252,16 @@ async def add_playing_user(user_id: int) -> None:
redis_async = get_redis()
try:
await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, str(user_id))
await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3600) # 1小时过期
# 检查key是否已有过期时间如果没有则设置3小时过期
ttl = await redis_async.ttl(REDIS_PLAYING_USERS_KEY)
if ttl <= 0: # -1表示永不过期-2表示不存在0表示已过期
await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3 * 3600) # 3小时过期
logger.debug(f"Added playing user {user_id}")
# 立即更新当前区间统计
from app.service.interval_stats import update_user_activity_stats
asyncio.create_task(update_user_activity_stats())
except Exception as e:
logger.error(f"Error adding playing user {user_id}: {e}")
@@ -216,22 +278,26 @@ async def record_hourly_stats() -> None:
redis_sync = get_redis_message()
redis_async = get_redis()
try:
# 先确保Redis连接正常
await redis_async.ping()
online_count = await _get_online_users_count(redis_async)
playing_count = await _get_playing_users_count(redis_async)
current_time = datetime.utcnow()
history_point = {
"timestamp": datetime.utcnow().isoformat(),
"timestamp": current_time.isoformat(),
"online_count": online_count,
"playing_count": playing_count
}
# 添加到历史记录
await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point))
# 只保留48个数据点
# 只保留48个数据点24小时每30分钟一个点
await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47)
# 设置过期时间为25小时
await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 25 * 3600)
# 设置过期时间为26小时,确保有足够缓冲
await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600)
logger.debug(f"Recorded hourly stats: online={online_count}, playing={playing_count}")
logger.info(f"Recorded hourly stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}")
except Exception as e:
logger.error(f"Error recording hourly stats: {e}")

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
import json
from datetime import datetime, timedelta
from typing import Dict, Optional
from app.dependencies.database import get_redis, get_redis_message
from app.log import logger
from app.router.v2.stats import (
REDIS_ONLINE_HISTORY_KEY,
_get_online_users_count,
_get_playing_users_count,
_redis_exec
)
# Redis key for current interval stats
CURRENT_INTERVAL_STATS_KEY = "server:current_interval_stats"
class IntervalStatsManager:
"""区间统计管理器 - 管理当前30分钟区间的实时统计"""
@staticmethod
def _get_current_interval_key() -> str:
"""获取当前30分钟区间的唯一标识"""
now = datetime.utcnow()
# 将时间对齐到30分钟区间
interval_start = now.replace(minute=(now.minute // 30) * 30, second=0, microsecond=0)
return f"{CURRENT_INTERVAL_STATS_KEY}:{interval_start.strftime('%Y%m%d_%H%M')}"
@staticmethod
async def update_current_interval() -> None:
"""更新当前区间的统计数据"""
redis_sync = get_redis_message()
redis_async = get_redis()
try:
# 获取当前在线和游玩用户数
online_count = await _get_online_users_count(redis_async)
playing_count = await _get_playing_users_count(redis_async)
current_time = datetime.utcnow()
interval_key = IntervalStatsManager._get_current_interval_key()
# 准备区间统计数据
interval_stats = {
"timestamp": current_time.isoformat(),
"online_count": online_count,
"playing_count": playing_count,
"last_updated": current_time.isoformat()
}
# 存储当前区间统计
await _redis_exec(redis_sync.set, interval_key, json.dumps(interval_stats))
await redis_async.expire(interval_key, 35 * 60) # 35分钟过期确保覆盖整个区间
logger.debug(f"Updated current interval stats: online={online_count}, playing={playing_count}")
except Exception as e:
logger.error(f"Error updating current interval stats: {e}")
@staticmethod
async def get_current_interval_stats() -> Optional[Dict]:
"""获取当前区间的统计数据"""
redis_sync = get_redis_message()
try:
interval_key = IntervalStatsManager._get_current_interval_key()
stats_data = await _redis_exec(redis_sync.get, interval_key)
if stats_data:
return json.loads(stats_data)
return None
except Exception as e:
logger.error(f"Error getting current interval stats: {e}")
return None
@staticmethod
async def finalize_current_interval() -> bool:
"""完成当前区间统计,将其添加到历史记录中"""
redis_sync = get_redis_message()
redis_async = get_redis()
try:
# 获取当前区间的统计
current_stats = await IntervalStatsManager.get_current_interval_stats()
if not current_stats:
# 如果没有当前区间数据,使用实时数据
online_count = await _get_online_users_count(redis_async)
playing_count = await _get_playing_users_count(redis_async)
current_time = datetime.utcnow()
current_stats = {
"timestamp": current_time.isoformat(),
"online_count": online_count,
"playing_count": playing_count
}
# 调整时间戳到区间结束时间
now = datetime.utcnow()
interval_end = now.replace(minute=(now.minute // 30) * 30, second=0, microsecond=0)
if now.minute % 30 != 0 or now.second != 0:
interval_end += timedelta(minutes=30)
history_point = {
"timestamp": interval_end.isoformat(),
"online_count": current_stats["online_count"],
"playing_count": current_stats["playing_count"]
}
# 添加到历史记录
await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point))
# 只保留48个数据点24小时每30分钟一个点
await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47)
# 设置过期时间为26小时确保有足够缓冲
await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600)
logger.info(f"Finalized interval stats: online={current_stats['online_count']}, playing={current_stats['playing_count']} at {interval_end.strftime('%H:%M:%S')}")
return True
except Exception as e:
logger.error(f"Error finalizing current interval: {e}")
return False
# 便捷函数
async def update_user_activity_stats() -> None:
"""在用户活动时更新统计(登录、开始游玩等)"""
await IntervalStatsManager.update_current_interval()
async def get_enhanced_current_stats() -> Dict:
"""获取增强的当前统计,包含区间数据"""
from app.router.v2.stats import get_server_stats
# 获取基础统计
current_stats = await get_server_stats()
# 获取区间统计
interval_stats = await IntervalStatsManager.get_current_interval_stats()
result = {
"registered_users": current_stats.registered_users,
"online_users": current_stats.online_users,
"playing_users": current_stats.playing_users,
"timestamp": current_stats.timestamp.isoformat(),
"interval_data": interval_stats
}
return result

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
from datetime import datetime, timedelta
from app.dependencies.database import get_redis, get_redis_message
from app.log import logger
from app.router.v2.stats import REDIS_ONLINE_USERS_KEY, REDIS_PLAYING_USERS_KEY, _redis_exec
async def cleanup_stale_online_users() -> tuple[int, int]:
"""清理过期的在线和游玩用户,返回清理的用户数"""
redis_sync = get_redis_message()
redis_async = get_redis()
online_cleaned = 0
playing_cleaned = 0
try:
# 获取所有在线用户
online_users = await _redis_exec(redis_sync.smembers, REDIS_ONLINE_USERS_KEY)
playing_users = await _redis_exec(redis_sync.smembers, REDIS_PLAYING_USERS_KEY)
# 检查在线用户的最后活动时间
current_time = datetime.utcnow()
stale_threshold = current_time - timedelta(hours=2) # 2小时无活动视为过期
# 对于在线用户我们检查metadata在线标记
stale_online_users = []
for user_id in online_users:
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
metadata_key = f"metadata:online:{user_id_str}"
# 如果metadata标记不存在说明用户已经离线
if not await redis_async.exists(metadata_key):
stale_online_users.append(user_id_str)
# 清理过期的在线用户
if stale_online_users:
await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, *stale_online_users)
online_cleaned = len(stale_online_users)
logger.info(f"Cleaned {online_cleaned} stale online users")
# 对于游玩用户我们也检查对应的spectator状态
stale_playing_users = []
for user_id in playing_users:
user_id_str = user_id.decode() if isinstance(user_id, bytes) else str(user_id)
# 如果用户不在在线用户列表中,说明已经离线,也应该从游玩列表中移除
if user_id_str in stale_online_users or user_id_str not in [
u.decode() if isinstance(u, bytes) else str(u) for u in online_users
]:
stale_playing_users.append(user_id_str)
# 清理过期的游玩用户
if stale_playing_users:
await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, *stale_playing_users)
playing_cleaned = len(stale_playing_users)
logger.info(f"Cleaned {playing_cleaned} stale playing users")
except Exception as e:
logger.error(f"Error cleaning stale users: {e}")
return online_cleaned, playing_cleaned
async def refresh_redis_key_expiry() -> None:
"""刷新Redis键的过期时间防止数据丢失"""
redis_async = get_redis()
try:
# 刷新在线用户key的过期时间
if await redis_async.exists(REDIS_ONLINE_USERS_KEY):
await redis_async.expire(REDIS_ONLINE_USERS_KEY, 6 * 3600) # 6小时
# 刷新游玩用户key的过期时间
if await redis_async.exists(REDIS_PLAYING_USERS_KEY):
await redis_async.expire(REDIS_PLAYING_USERS_KEY, 6 * 3600) # 6小时
logger.debug("Refreshed Redis key expiry times")
except Exception as e:
logger.error(f"Error refreshing Redis key expiry: {e}")

View File

@@ -1,10 +1,12 @@
from __future__ import annotations
import asyncio
from datetime import datetime
from datetime import datetime, timedelta
from app.log import logger
from app.router.v2.stats import record_hourly_stats, update_registered_users_count
from app.service.stats_cleanup import cleanup_stale_online_users, refresh_redis_key_expiry
from app.service.interval_stats import IntervalStatsManager, update_user_activity_stats
class StatsScheduler:
@@ -14,6 +16,7 @@ class StatsScheduler:
self._running = False
self._stats_task: asyncio.Task | None = None
self._registered_task: asyncio.Task | None = None
self._cleanup_task: asyncio.Task | None = None
def start(self) -> None:
"""启动调度器"""
@@ -23,6 +26,7 @@ class StatsScheduler:
self._running = True
self._stats_task = asyncio.create_task(self._stats_loop())
self._registered_task = asyncio.create_task(self._registered_users_loop())
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
logger.info("Stats scheduler started")
def stop(self) -> None:
@@ -36,32 +40,112 @@ class StatsScheduler:
self._stats_task.cancel()
if self._registered_task:
self._registered_task.cancel()
if self._cleanup_task:
self._cleanup_task.cancel()
logger.info("Stats scheduler stopped")
async def _stats_loop(self) -> None:
"""统计数据记录循环 - 每30分钟记录一次"""
# 启动时立即记录一次统计数据
try:
await IntervalStatsManager.update_current_interval()
logger.info("Initial interval statistics updated on startup")
except Exception as e:
logger.error(f"Error updating initial interval stats: {e}")
while self._running:
try:
await record_hourly_stats()
logger.debug("Recorded hourly statistics")
# 计算下次记录时间下个30分钟整点
now = datetime.utcnow()
minutes_until_next = 30 - (now.minute % 30)
next_record_time = now.replace(second=0, microsecond=0) + timedelta(minutes=minutes_until_next)
# 计算需要等待的秒数
sleep_seconds = (next_record_time - now).total_seconds()
# 确保至少等待30分钟但不超过31分钟防止时间漂移
sleep_seconds = max(min(sleep_seconds, 31 * 60), 30 * 60)
await asyncio.sleep(sleep_seconds)
if not self._running:
break
# 完成当前区间并记录到历史
success = await IntervalStatsManager.finalize_current_interval()
if success:
logger.info(f"Finalized interval statistics at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}")
else:
# 如果区间完成失败,使用原有方式记录
await record_hourly_stats()
logger.info(f"Recorded hourly statistics (fallback) at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}")
# 开始新的区间统计
await IntervalStatsManager.update_current_interval()
except Exception as e:
logger.error(f"Error in stats loop: {e}")
# 等待30分钟
await asyncio.sleep(30 * 60)
# 出错时等待5分钟再重试
await asyncio.sleep(5 * 60)
async def _registered_users_loop(self) -> None:
"""注册用户数更新循环 - 每5分钟更新一次"""
# 启动时立即更新一次注册用户数
try:
await update_registered_users_count()
logger.info("Initial registered users count updated on startup")
except Exception as e:
logger.error(f"Error updating initial registered users count: {e}")
while self._running:
# 等待5分钟
await asyncio.sleep(5 * 60)
if not self._running:
break
try:
await update_registered_users_count()
logger.debug("Updated registered users count")
except Exception as e:
logger.error(f"Error in registered users loop: {e}")
async def _cleanup_loop(self) -> None:
"""清理循环 - 每10分钟清理一次过期用户"""
# 启动时立即执行一次清理
try:
online_cleaned, playing_cleaned = await cleanup_stale_online_users()
if online_cleaned > 0 or playing_cleaned > 0:
logger.info(f"Initial cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users")
# 等待5分钟
await asyncio.sleep(5 * 60)
await refresh_redis_key_expiry()
except Exception as e:
logger.error(f"Error in initial cleanup: {e}")
while self._running:
# 等待10分钟
await asyncio.sleep(10 * 60)
if not self._running:
break
try:
# 清理过期用户
online_cleaned, playing_cleaned = await cleanup_stale_online_users()
if online_cleaned > 0 or playing_cleaned > 0:
logger.info(f"Cleanup: removed {online_cleaned} stale online users, {playing_cleaned} stale playing users")
# 刷新Redis key过期时间
await refresh_redis_key_expiry()
# 更新当前区间统计每10分钟更新一次以保持数据新鲜
await IntervalStatsManager.update_current_interval()
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
# 出错时等待2分钟再重试
await asyncio.sleep(2 * 60)
# 全局调度器实例