diff --git a/app/router/v2/stats.py b/app/router/v2/stats.py index 63927c5..e247fd3 100644 --- a/app/router/v2/stats.py +++ b/app/router/v2/stats.py @@ -99,6 +99,7 @@ async def get_online_history() -> OnlineHistoryResponse: for data in history_data: try: point_data = json.loads(data) + # 支持新旧格式的历史数据 history_points.append(OnlineHistoryPoint( timestamp=datetime.fromisoformat(point_data["timestamp"]), online_count=point_data["online_count"], @@ -111,22 +112,18 @@ async def get_online_history() -> OnlineHistoryResponse: # 获取当前实时统计信息 current_stats = await get_server_stats() - # 将当前实时数据作为最新的数据点添加到历史中 + # 将当前实时数据作为最新的数据点添加到历史中(如果需要) current_point = OnlineHistoryPoint( timestamp=current_stats.timestamp, online_count=current_stats.online_users, playing_count=current_stats.playing_users ) - # 检查是否需要添加当前数据点 - # 如果最新的历史数据超过15分钟,则添加当前数据点 - should_add_current = True - if history_points: - latest_history = max(history_points, key=lambda x: x.timestamp) - time_diff = (current_stats.timestamp - latest_history.timestamp).total_seconds() - should_add_current = time_diff > 15 * 60 # 15分钟 - - if should_add_current: + # 如果历史数据为空或者最新数据超过15分钟,添加当前数据点 + if not history_points or ( + history_points and + (current_stats.timestamp - max(history_points, key=lambda x: x.timestamp).timestamp).total_seconds() > 15 * 60 + ): history_points.append(current_point) # 按时间排序(最新的在前) @@ -148,28 +145,6 @@ async def get_online_history() -> OnlineHistoryResponse: current_stats=current_stats ) -@router.get("/stats/realtime", tags=["统计"]) -async def get_realtime_stats(): - """ - 获取实时统计数据 - - 返回包含当前区间统计的增强实时数据 - """ - try: - from app.service.interval_stats import get_enhanced_current_stats - return await get_enhanced_current_stats() - except Exception as e: - logger.error(f"Error getting realtime stats: {e}") - # 回退到基础统计 - stats = await get_server_stats() - return { - "registered_users": stats.registered_users, - "online_users": stats.online_users, - "playing_users": stats.playing_users, - "timestamp": stats.timestamp.isoformat(), - "interval_data": None - } - async def _get_registered_users_count(redis) -> int: """获取注册用户总数(从缓存)""" try: @@ -231,8 +206,8 @@ async def add_online_user(user_id: int) -> None: logger.debug(f"Added online user {user_id}") # 立即更新当前区间统计 - from app.service.interval_stats import update_user_activity_stats - asyncio.create_task(update_user_activity_stats()) + from app.service.enhanced_interval_stats import update_user_activity_in_interval + asyncio.create_task(update_user_activity_in_interval(user_id, is_playing=False)) except Exception as e: logger.error(f"Error adding online user {user_id}: {e}") @@ -259,8 +234,8 @@ async def add_playing_user(user_id: int) -> None: logger.debug(f"Added playing user {user_id}") # 立即更新当前区间统计 - from app.service.interval_stats import update_user_activity_stats - asyncio.create_task(update_user_activity_stats()) + from app.service.enhanced_interval_stats import update_user_activity_in_interval + asyncio.create_task(update_user_activity_in_interval(user_id, is_playing=True)) except Exception as e: logger.error(f"Error adding playing user {user_id}: {e}") @@ -274,7 +249,7 @@ async def remove_playing_user(user_id: int) -> None: logger.error(f"Error removing playing user {user_id}: {e}") async def record_hourly_stats() -> None: - """记录每小时统计数据""" + """记录统计数据 - 简化版本,主要作为fallback使用""" redis_sync = get_redis_message() redis_async = get_redis() try: @@ -288,7 +263,10 @@ async def record_hourly_stats() -> None: history_point = { "timestamp": current_time.isoformat(), "online_count": online_count, - "playing_count": playing_count + "playing_count": playing_count, + "peak_online": online_count, + "peak_playing": playing_count, + "total_samples": 1 } # 添加到历史记录 @@ -298,6 +276,6 @@ async def record_hourly_stats() -> None: # 设置过期时间为26小时,确保有足够缓冲 await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - logger.info(f"Recorded hourly stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}") + logger.info(f"Recorded fallback stats: online={online_count}, playing={playing_count} at {current_time.strftime('%H:%M:%S')}") except Exception as e: - logger.error(f"Error recording hourly stats: {e}") + logger.error(f"Error recording fallback stats: {e}") diff --git a/app/service/enhanced_interval_stats.py b/app/service/enhanced_interval_stats.py new file mode 100644 index 0000000..bb67dbc --- /dev/null +++ b/app/service/enhanced_interval_stats.py @@ -0,0 +1,445 @@ +""" +重构的区间统计系统 - 真正统计半小时区间内的用户活跃情况 +""" + +from __future__ import annotations + +import json +import asyncio +from datetime import datetime, timedelta +from typing import Dict, Set, Optional, List +from dataclasses import dataclass, asdict + +from app.dependencies.database import get_redis, get_redis_message +from app.log import logger +from app.router.v2.stats import ( + REDIS_ONLINE_HISTORY_KEY, + _get_online_users_count, + _get_playing_users_count, + _redis_exec +) + +# Redis keys for interval statistics +INTERVAL_STATS_BASE_KEY = "server:interval_stats" +INTERVAL_ONLINE_USERS_KEY = "server:interval_online_users" # 区间内在线用户集合 +INTERVAL_PLAYING_USERS_KEY = "server:interval_playing_users" # 区间内游玩用户集合 +CURRENT_INTERVAL_INFO_KEY = "server:current_interval_info" # 当前区间信息 + + +@dataclass +class IntervalInfo: + """区间信息""" + start_time: datetime + end_time: datetime + interval_key: str + + def is_current(self) -> bool: + """检查是否是当前区间""" + now = datetime.utcnow() + return self.start_time <= now < self.end_time + + def to_dict(self) -> Dict: + return { + 'start_time': self.start_time.isoformat(), + 'end_time': self.end_time.isoformat(), + 'interval_key': self.interval_key + } + + @classmethod + def from_dict(cls, data: Dict) -> 'IntervalInfo': + return cls( + start_time=datetime.fromisoformat(data['start_time']), + end_time=datetime.fromisoformat(data['end_time']), + interval_key=data['interval_key'] + ) + + +@dataclass +class IntervalStats: + """区间统计数据""" + interval_key: str + start_time: datetime + end_time: datetime + unique_online_users: int # 区间内独特在线用户数 + unique_playing_users: int # 区间内独特游玩用户数 + peak_online_count: int # 区间内在线用户数峰值 + peak_playing_count: int # 区间内游玩用户数峰值 + total_samples: int # 采样次数 + created_at: datetime + + def to_dict(self) -> Dict: + return { + 'interval_key': self.interval_key, + 'start_time': self.start_time.isoformat(), + 'end_time': self.end_time.isoformat(), + 'unique_online_users': self.unique_online_users, + 'unique_playing_users': self.unique_playing_users, + 'peak_online_count': self.peak_online_count, + 'peak_playing_count': self.peak_playing_count, + 'total_samples': self.total_samples, + 'created_at': self.created_at.isoformat() + } + + @classmethod + def from_dict(cls, data: Dict) -> 'IntervalStats': + return cls( + interval_key=data['interval_key'], + start_time=datetime.fromisoformat(data['start_time']), + end_time=datetime.fromisoformat(data['end_time']), + unique_online_users=data['unique_online_users'], + unique_playing_users=data['unique_playing_users'], + peak_online_count=data['peak_online_count'], + peak_playing_count=data['peak_playing_count'], + total_samples=data['total_samples'], + created_at=datetime.fromisoformat(data['created_at']) + ) + + +class EnhancedIntervalStatsManager: + """增强的区间统计管理器 - 真正统计半小时区间内的用户活跃情况""" + + @staticmethod + def get_current_interval_boundaries() -> tuple[datetime, datetime]: + """获取当前30分钟区间的边界""" + now = datetime.utcnow() + # 计算区间开始时间(向下取整到最近的30分钟) + minute = (now.minute // 30) * 30 + start_time = now.replace(minute=minute, second=0, microsecond=0) + # 区间结束时间 + end_time = start_time + timedelta(minutes=30) + return start_time, end_time + + @staticmethod + def generate_interval_key(start_time: datetime) -> str: + """生成区间唯一标识""" + return f"{INTERVAL_STATS_BASE_KEY}:{start_time.strftime('%Y%m%d_%H%M')}" + + @staticmethod + async def get_current_interval_info() -> IntervalInfo: + """获取当前区间信息""" + start_time, end_time = EnhancedIntervalStatsManager.get_current_interval_boundaries() + interval_key = EnhancedIntervalStatsManager.generate_interval_key(start_time) + + return IntervalInfo( + start_time=start_time, + end_time=end_time, + interval_key=interval_key + ) + + @staticmethod + async def initialize_current_interval() -> None: + """初始化当前区间""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() + + # 存储当前区间信息 + await _redis_exec( + redis_sync.set, + CURRENT_INTERVAL_INFO_KEY, + json.dumps(current_interval.to_dict()) + ) + await redis_async.expire(CURRENT_INTERVAL_INFO_KEY, 35 * 60) # 35分钟过期 + + # 初始化区间用户集合(如果不存在) + online_key = f"{INTERVAL_ONLINE_USERS_KEY}:{current_interval.interval_key}" + playing_key = f"{INTERVAL_PLAYING_USERS_KEY}:{current_interval.interval_key}" + + # 设置过期时间为35分钟 + await redis_async.expire(online_key, 35 * 60) + await redis_async.expire(playing_key, 35 * 60) + + # 初始化区间统计记录 + stats = IntervalStats( + interval_key=current_interval.interval_key, + start_time=current_interval.start_time, + end_time=current_interval.end_time, + unique_online_users=0, + unique_playing_users=0, + peak_online_count=0, + peak_playing_count=0, + total_samples=0, + created_at=datetime.utcnow() + ) + + await _redis_exec( + redis_sync.set, + current_interval.interval_key, + json.dumps(stats.to_dict()) + ) + await redis_async.expire(current_interval.interval_key, 35 * 60) + + # 如果历史记录为空,自动填充前24小时数据为0 + await EnhancedIntervalStatsManager._ensure_24h_history_exists() + + logger.info(f"Initialized interval stats for {current_interval.start_time.strftime('%H:%M')} - {current_interval.end_time.strftime('%H:%M')}") + + except Exception as e: + logger.error(f"Error initializing current interval: {e}") + + @staticmethod + async def _ensure_24h_history_exists() -> None: + """确保24小时历史数据存在,不存在则用0填充""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + # 检查现有历史数据数量 + history_length = await _redis_exec(redis_sync.llen, REDIS_ONLINE_HISTORY_KEY) + + if history_length < 48: # 少于48个数据点(24小时*2) + logger.info(f"History has only {history_length} points, filling with zeros for 24h") + + # 计算需要填充的数据点数量 + needed_points = 48 - history_length + + # 从当前时间往前推,创建缺失的时间点(都填充为0) + current_time = datetime.utcnow() + current_interval_start, _ = EnhancedIntervalStatsManager.get_current_interval_boundaries() + + # 从当前区间开始往前推,创建历史数据点 + fill_points = [] + for i in range(needed_points): + # 每次往前推30分钟 + point_time = current_interval_start - timedelta(minutes=30 * (i + 1)) + + history_point = { + "timestamp": point_time.isoformat(), + "online_count": 0, + "playing_count": 0, + "peak_online": 0, + "peak_playing": 0, + "total_samples": 0 + } + fill_points.append(json.dumps(history_point)) + + # 将填充数据添加到历史记录末尾(最旧的数据) + if fill_points: + # 先将现有数据转移到临时位置 + temp_key = f"{REDIS_ONLINE_HISTORY_KEY}_temp" + if history_length > 0: + # 复制现有数据到临时key + existing_data = await _redis_exec(redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1) + if existing_data: + for data in existing_data: + await _redis_exec(redis_sync.rpush, temp_key, data) + + # 清空原有key + await redis_async.delete(REDIS_ONLINE_HISTORY_KEY) + + # 先添加填充数据(最旧的) + for point in reversed(fill_points): # 反向添加,最旧的在最后 + await _redis_exec(redis_sync.rpush, REDIS_ONLINE_HISTORY_KEY, point) + + # 再添加原有数据(较新的) + if history_length > 0: + existing_data = await _redis_exec(redis_sync.lrange, temp_key, 0, -1) + for data in existing_data: + await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, data) + + # 清理临时key + await redis_async.delete(temp_key) + + # 确保只保留48个数据点 + await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) + + # 设置过期时间 + await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) + + logger.info(f"Filled {len(fill_points)} historical data points with zeros") + + except Exception as e: + logger.error(f"Error ensuring 24h history exists: {e}") + + @staticmethod + async def add_user_to_interval(user_id: int, is_playing: bool = False) -> None: + """添加用户到当前区间统计""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() + + # 添加到区间在线用户集合 + online_key = f"{INTERVAL_ONLINE_USERS_KEY}:{current_interval.interval_key}" + await _redis_exec(redis_sync.sadd, online_key, str(user_id)) + await redis_async.expire(online_key, 35 * 60) + + # 如果用户在游玩,也添加到游玩用户集合 + if is_playing: + playing_key = f"{INTERVAL_PLAYING_USERS_KEY}:{current_interval.interval_key}" + await _redis_exec(redis_sync.sadd, playing_key, str(user_id)) + await redis_async.expire(playing_key, 35 * 60) + + # 异步更新区间统计 + asyncio.create_task(EnhancedIntervalStatsManager._update_interval_stats()) + + except Exception as e: + logger.error(f"Error adding user {user_id} to interval: {e}") + + @staticmethod + async def _update_interval_stats() -> None: + """更新当前区间统计(内部方法)""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() + + # 获取区间内独特用户数 + online_key = f"{INTERVAL_ONLINE_USERS_KEY}:{current_interval.interval_key}" + playing_key = f"{INTERVAL_PLAYING_USERS_KEY}:{current_interval.interval_key}" + + unique_online = await _redis_exec(redis_sync.scard, online_key) + unique_playing = await _redis_exec(redis_sync.scard, playing_key) + + # 获取当前实时用户数作为峰值参考 + current_online = await _get_online_users_count(redis_async) + current_playing = await _get_playing_users_count(redis_async) + + # 获取现有统计数据 + existing_data = await _redis_exec(redis_sync.get, current_interval.interval_key) + if existing_data: + stats = IntervalStats.from_dict(json.loads(existing_data)) + # 更新峰值 + stats.peak_online_count = max(stats.peak_online_count, current_online) + stats.peak_playing_count = max(stats.peak_playing_count, current_playing) + stats.total_samples += 1 + else: + # 创建新的统计记录 + stats = IntervalStats( + interval_key=current_interval.interval_key, + start_time=current_interval.start_time, + end_time=current_interval.end_time, + unique_online_users=0, + unique_playing_users=0, + peak_online_count=current_online, + peak_playing_count=current_playing, + total_samples=1, + created_at=datetime.utcnow() + ) + + # 更新独特用户数 + stats.unique_online_users = unique_online + stats.unique_playing_users = unique_playing + + # 保存更新的统计数据 + await _redis_exec( + redis_sync.set, + current_interval.interval_key, + json.dumps(stats.to_dict()) + ) + await redis_async.expire(current_interval.interval_key, 35 * 60) + + except Exception as e: + logger.error(f"Error updating interval stats: {e}") + + @staticmethod + async def finalize_interval() -> Optional[IntervalStats]: + """完成当前区间统计并保存到历史""" + redis_sync = get_redis_message() + redis_async = get_redis() + + try: + current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() + + # 最后一次更新统计 + await EnhancedIntervalStatsManager._update_interval_stats() + + # 获取最终统计数据 + stats_data = await _redis_exec(redis_sync.get, current_interval.interval_key) + if not stats_data: + logger.warning("No interval stats found to finalize") + return None + + stats = IntervalStats.from_dict(json.loads(stats_data)) + + # 创建历史记录点(使用独特用户数作为主要统计) + history_point = { + "timestamp": stats.end_time.isoformat(), + "online_count": stats.unique_online_users, + "playing_count": stats.unique_playing_users, + "peak_online": stats.peak_online_count, + "peak_playing": stats.peak_playing_count, + "total_samples": stats.total_samples + } + + # 添加到历史记录 + await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) + # 只保留48个数据点(24小时,每30分钟一个点) + await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) + # 设置过期时间为26小时,确保有足够缓冲 + await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) + + logger.info( + f"Finalized interval stats: " + f"unique_online={stats.unique_online_users}, " + f"unique_playing={stats.unique_playing_users}, " + f"peak_online={stats.peak_online_count}, " + f"peak_playing={stats.peak_playing_count}, " + f"samples={stats.total_samples} " + f"for {stats.start_time.strftime('%H:%M')}-{stats.end_time.strftime('%H:%M')}" + ) + + return stats + + except Exception as e: + logger.error(f"Error finalizing interval stats: {e}") + return None + + @staticmethod + async def get_current_interval_stats() -> Optional[IntervalStats]: + """获取当前区间统计""" + redis_sync = get_redis_message() + + try: + current_interval = await EnhancedIntervalStatsManager.get_current_interval_info() + stats_data = await _redis_exec(redis_sync.get, current_interval.interval_key) + + if stats_data: + return IntervalStats.from_dict(json.loads(stats_data)) + return None + + except Exception as e: + logger.error(f"Error getting current interval stats: {e}") + return None + + @staticmethod + async def cleanup_old_intervals() -> None: + """清理过期的区间数据""" + redis_async = get_redis() + + try: + # 删除过期的区间统计数据(超过2小时的) + cutoff_time = datetime.utcnow() - timedelta(hours=2) + pattern = f"{INTERVAL_STATS_BASE_KEY}:*" + + keys = await redis_async.keys(pattern) + for key in keys: + try: + # 从key中提取时间 + time_part = key.decode().split(':')[-1] # YYYYMMDD_HHMM格式 + key_time = datetime.strptime(time_part, '%Y%m%d_%H%M') + + if key_time < cutoff_time: + await redis_async.delete(key) + # 也删除对应的用户集合 + await redis_async.delete(f"{INTERVAL_ONLINE_USERS_KEY}:{key}") + await redis_async.delete(f"{INTERVAL_PLAYING_USERS_KEY}:{key}") + + except (ValueError, IndexError): + # 忽略解析错误的key + continue + + logger.debug("Cleaned up old interval data") + + except Exception as e: + logger.error(f"Error cleaning up old intervals: {e}") + + +# 便捷函数,用于替换现有的统计更新函数 +async def update_user_activity_in_interval(user_id: int, is_playing: bool = False) -> None: + """用户活动时更新区间统计(在登录、开始游玩等时调用)""" + await EnhancedIntervalStatsManager.add_user_to_interval(user_id, is_playing) diff --git a/app/service/interval_stats.py b/app/service/interval_stats.py deleted file mode 100644 index 30c23ba..0000000 --- a/app/service/interval_stats.py +++ /dev/null @@ -1,148 +0,0 @@ -from __future__ import annotations - -import json -from datetime import datetime, timedelta -from typing import Dict, Optional - -from app.dependencies.database import get_redis, get_redis_message -from app.log import logger -from app.router.v2.stats import ( - REDIS_ONLINE_HISTORY_KEY, - _get_online_users_count, - _get_playing_users_count, - _redis_exec -) - -# Redis key for current interval stats -CURRENT_INTERVAL_STATS_KEY = "server:current_interval_stats" - -class IntervalStatsManager: - """区间统计管理器 - 管理当前30分钟区间的实时统计""" - - @staticmethod - def _get_current_interval_key() -> str: - """获取当前30分钟区间的唯一标识""" - now = datetime.utcnow() - # 将时间对齐到30分钟区间 - interval_start = now.replace(minute=(now.minute // 30) * 30, second=0, microsecond=0) - return f"{CURRENT_INTERVAL_STATS_KEY}:{interval_start.strftime('%Y%m%d_%H%M')}" - - @staticmethod - async def update_current_interval() -> None: - """更新当前区间的统计数据""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - # 获取当前在线和游玩用户数 - online_count = await _get_online_users_count(redis_async) - playing_count = await _get_playing_users_count(redis_async) - - current_time = datetime.utcnow() - interval_key = IntervalStatsManager._get_current_interval_key() - - # 准备区间统计数据 - interval_stats = { - "timestamp": current_time.isoformat(), - "online_count": online_count, - "playing_count": playing_count, - "last_updated": current_time.isoformat() - } - - # 存储当前区间统计 - await _redis_exec(redis_sync.set, interval_key, json.dumps(interval_stats)) - await redis_async.expire(interval_key, 35 * 60) # 35分钟过期,确保覆盖整个区间 - - logger.debug(f"Updated current interval stats: online={online_count}, playing={playing_count}") - - except Exception as e: - logger.error(f"Error updating current interval stats: {e}") - - @staticmethod - async def get_current_interval_stats() -> Optional[Dict]: - """获取当前区间的统计数据""" - redis_sync = get_redis_message() - - try: - interval_key = IntervalStatsManager._get_current_interval_key() - stats_data = await _redis_exec(redis_sync.get, interval_key) - - if stats_data: - return json.loads(stats_data) - return None - - except Exception as e: - logger.error(f"Error getting current interval stats: {e}") - return None - - @staticmethod - async def finalize_current_interval() -> bool: - """完成当前区间统计,将其添加到历史记录中""" - redis_sync = get_redis_message() - redis_async = get_redis() - - try: - # 获取当前区间的统计 - current_stats = await IntervalStatsManager.get_current_interval_stats() - if not current_stats: - # 如果没有当前区间数据,使用实时数据 - online_count = await _get_online_users_count(redis_async) - playing_count = await _get_playing_users_count(redis_async) - current_time = datetime.utcnow() - - current_stats = { - "timestamp": current_time.isoformat(), - "online_count": online_count, - "playing_count": playing_count - } - - # 调整时间戳到区间结束时间 - now = datetime.utcnow() - interval_end = now.replace(minute=(now.minute // 30) * 30, second=0, microsecond=0) - if now.minute % 30 != 0 or now.second != 0: - interval_end += timedelta(minutes=30) - - history_point = { - "timestamp": interval_end.isoformat(), - "online_count": current_stats["online_count"], - "playing_count": current_stats["playing_count"] - } - - # 添加到历史记录 - await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) - # 只保留48个数据点(24小时,每30分钟一个点) - await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) - # 设置过期时间为26小时,确保有足够缓冲 - await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 26 * 3600) - - logger.info(f"Finalized interval stats: online={current_stats['online_count']}, playing={current_stats['playing_count']} at {interval_end.strftime('%H:%M:%S')}") - return True - - except Exception as e: - logger.error(f"Error finalizing current interval: {e}") - return False - -# 便捷函数 -async def update_user_activity_stats() -> None: - """在用户活动时更新统计(登录、开始游玩等)""" - await IntervalStatsManager.update_current_interval() - -async def get_enhanced_current_stats() -> Dict: - """获取增强的当前统计,包含区间数据""" - from app.router.v2.stats import get_server_stats - - # 获取基础统计 - current_stats = await get_server_stats() - - # 获取区间统计 - interval_stats = await IntervalStatsManager.get_current_interval_stats() - - result = { - "registered_users": current_stats.registered_users, - "online_users": current_stats.online_users, - "playing_users": current_stats.playing_users, - "timestamp": current_stats.timestamp.isoformat(), - "interval_data": interval_stats - } - - return result diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py index db32f47..10faabe 100644 --- a/app/service/stats_scheduler.py +++ b/app/service/stats_scheduler.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta from app.log import logger from app.router.v2.stats import record_hourly_stats, update_registered_users_count from app.service.stats_cleanup import cleanup_stale_online_users, refresh_redis_key_expiry -from app.service.interval_stats import IntervalStatsManager, update_user_activity_stats +from app.service.enhanced_interval_stats import EnhancedIntervalStatsManager class StatsScheduler: @@ -49,10 +49,10 @@ class StatsScheduler: """统计数据记录循环 - 每30分钟记录一次""" # 启动时立即记录一次统计数据 try: - await IntervalStatsManager.update_current_interval() - logger.info("Initial interval statistics updated on startup") + await EnhancedIntervalStatsManager.initialize_current_interval() + logger.info("Initial enhanced interval statistics initialized on startup") except Exception as e: - logger.error(f"Error updating initial interval stats: {e}") + logger.error(f"Error initializing enhanced interval stats: {e}") while self._running: try: @@ -73,16 +73,16 @@ class StatsScheduler: break # 完成当前区间并记录到历史 - success = await IntervalStatsManager.finalize_current_interval() - if success: - logger.info(f"Finalized interval statistics at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}") + finalized_stats = await EnhancedIntervalStatsManager.finalize_interval() + if finalized_stats: + logger.info(f"Finalized enhanced interval statistics at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}") else: # 如果区间完成失败,使用原有方式记录 await record_hourly_stats() logger.info(f"Recorded hourly statistics (fallback) at {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')}") # 开始新的区间统计 - await IntervalStatsManager.update_current_interval() + await EnhancedIntervalStatsManager.initialize_current_interval() except Exception as e: logger.error(f"Error in stats loop: {e}") @@ -139,8 +139,8 @@ class StatsScheduler: # 刷新Redis key过期时间 await refresh_redis_key_expiry() - # 更新当前区间统计(每10分钟更新一次以保持数据新鲜) - await IntervalStatsManager.update_current_interval() + # 清理过期的区间数据 + await EnhancedIntervalStatsManager.cleanup_old_intervals() except Exception as e: logger.error(f"Error in cleanup loop: {e}") diff --git a/test_stats_api.py b/test_stats_api.py deleted file mode 100644 index d4bf531..0000000 --- a/test_stats_api.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 -""" -服务器统计API测试脚本 -""" - -import asyncio -import json -from datetime import datetime - -import httpx - - -async def test_stats_api(): - """测试统计API""" - base_url = "http://localhost:8000" # 根据实际服务器地址修改 - - async with httpx.AsyncClient() as client: - print("🧪 测试服务器统计API...") - - # 测试服务器统计信息接口 - print("\n1. 测试 /api/v2/stats 端点...") - try: - response = await client.get(f"{base_url}/api/v2/stats") - if response.status_code == 200: - data = response.json() - print(f"✅ 成功获取服务器统计信息:") - print(f" - 注册用户: {data['registered_users']}") - print(f" - 在线用户: {data['online_users']}") - print(f" - 游玩用户: {data['playing_users']}") - print(f" - 更新时间: {data['timestamp']}") - else: - print(f"❌ 请求失败: HTTP {response.status_code}") - print(f" 响应: {response.text}") - except Exception as e: - print(f"❌ 请求异常: {e}") - - # 测试在线历史接口 - print("\n2. 测试 /api/v2/stats/history 端点...") - try: - response = await client.get(f"{base_url}/api/v2/stats/history") - if response.status_code == 200: - data = response.json() - print(f"✅ 成功获取在线历史信息:") - print(f" - 历史数据点数: {len(data['history'])}") - print(f" - 当前统计信息:") - current = data['current_stats'] - print(f" - 注册用户: {current['registered_users']}") - print(f" - 在线用户: {current['online_users']}") - print(f" - 游玩用户: {current['playing_users']}") - - if data['history']: - latest = data['history'][0] - print(f" - 最新历史记录:") - print(f" - 时间: {latest['timestamp']}") - print(f" - 在线数: {latest['online_count']}") - print(f" - 游玩数: {latest['playing_count']}") - else: - print(f" - 暂无历史数据(需要等待调度器记录)") - else: - print(f"❌ 请求失败: HTTP {response.status_code}") - print(f" 响应: {response.text}") - except Exception as e: - print(f"❌ 请求异常: {e}") - - -async def test_internal_functions(): - """测试内部函数""" - print("\n🔧 测试内部Redis函数...") - - try: - from app.router.v2.stats import ( - add_online_user, - remove_online_user, - add_playing_user, - remove_playing_user, - record_hourly_stats, - update_registered_users_count - ) - - # 测试添加用户 - print(" 测试添加在线用户...") - await add_online_user(999999) # 测试用户ID - - print(" 测试添加游玩用户...") - await add_playing_user(999999) - - print(" 测试记录统计数据...") - await record_hourly_stats() - - print(" 测试移除用户...") - await remove_playing_user(999999) - await remove_online_user(999999) - - print(" 测试更新注册用户数...") - await update_registered_users_count() - - print("✅ 内部函数测试完成") - - except Exception as e: - print(f"❌ 内部函数测试异常: {e}") - - -if __name__ == "__main__": - print("🚀 开始测试服务器统计功能...") - - # 首先测试内部函数 - asyncio.run(test_internal_functions()) - - # 然后测试API端点 - asyncio.run(test_stats_api()) - - print("\n✨ 测试完成!")