From ad131c015890f07c720d9c742d349f5748732cff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 05:56:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=95=B0=E6=8D=AE=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/router/v2/stats.py | 27 +++++++++++++++----------- app/service/enhanced_interval_stats.py | 26 ++++++++++++++++--------- app/service/stats_scheduler.py | 19 ++++++++++++------ 3 files changed, 46 insertions(+), 26 deletions(-) diff --git a/app/router/v2/stats.py b/app/router/v2/stats.py index e247fd3..e8e90e2 100644 --- a/app/router/v2/stats.py +++ b/app/router/v2/stats.py @@ -40,6 +40,9 @@ class OnlineHistoryPoint(BaseModel): timestamp: datetime online_count: int playing_count: int + peak_online: int | None = None # 峰值在线数(增强数据) + peak_playing: int | None = None # 峰值游玩数(增强数据) + total_samples: int | None = None # 采样次数(增强数据) class OnlineHistoryResponse(BaseModel): """24小时在线历史响应模型""" @@ -87,8 +90,6 @@ async def get_online_history() -> OnlineHistoryResponse: 返回过去24小时内每小时的在线用户数和游玩用户数统计, 包含当前实时数据作为最新数据点 """ - redis = get_redis() - try: # 获取历史数据 - 使用同步Redis客户端 redis_sync = get_redis_message() @@ -103,7 +104,10 @@ async def get_online_history() -> OnlineHistoryResponse: history_points.append(OnlineHistoryPoint( timestamp=datetime.fromisoformat(point_data["timestamp"]), online_count=point_data["online_count"], - playing_count=point_data["playing_count"] + playing_count=point_data["playing_count"], + peak_online=point_data.get("peak_online"), # 新字段,可能不存在 + peak_playing=point_data.get("peak_playing"), # 新字段,可能不存在 + total_samples=point_data.get("total_samples") # 新字段,可能不存在 )) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.warning(f"Invalid history data point: {data}, error: {e}") @@ -112,19 +116,19 @@ async def get_online_history() -> OnlineHistoryResponse: # 获取当前实时统计信息 current_stats = await get_server_stats() - # 将当前实时数据作为最新的数据点添加到历史中(如果需要) - current_point = OnlineHistoryPoint( - timestamp=current_stats.timestamp, - online_count=current_stats.online_users, - playing_count=current_stats.playing_users - ) - # 如果历史数据为空或者最新数据超过15分钟,添加当前数据点 if not history_points or ( history_points and (current_stats.timestamp - max(history_points, key=lambda x: x.timestamp).timestamp).total_seconds() > 15 * 60 ): - history_points.append(current_point) + history_points.append(OnlineHistoryPoint( + timestamp=current_stats.timestamp, + online_count=current_stats.online_users, + playing_count=current_stats.playing_users, + peak_online=current_stats.online_users, # 当前实时数据作为峰值 + peak_playing=current_stats.playing_users, + total_samples=1 + )) # 按时间排序(最新的在前) history_points.sort(key=lambda x: x.timestamp, reverse=True) @@ -145,6 +149,7 @@ async def get_online_history() -> OnlineHistoryResponse: current_stats=current_stats ) + async def _get_registered_users_count(redis) -> int: """获取注册用户总数(从缓存)""" try: diff --git a/app/service/enhanced_interval_stats.py b/app/service/enhanced_interval_stats.py index bb67dbc..21d97fb 100644 --- a/app/service/enhanced_interval_stats.py +++ b/app/service/enhanced_interval_stats.py @@ -199,12 +199,16 @@ class EnhancedIntervalStatsManager: current_time = datetime.utcnow() current_interval_start, _ = EnhancedIntervalStatsManager.get_current_interval_boundaries() - # 从当前区间开始往前推,创建历史数据点 + # 从当前区间开始往前推,创建历史数据点(确保时间对齐到30分钟边界) fill_points = [] for i in range(needed_points): - # 每次往前推30分钟 + # 每次往前推30分钟,确保时间对齐 point_time = current_interval_start - timedelta(minutes=30 * (i + 1)) + # 确保时间对齐到30分钟边界 + aligned_minute = (point_time.minute // 30) * 30 + point_time = point_time.replace(minute=aligned_minute, second=0, microsecond=0) + history_point = { "timestamp": point_time.isoformat(), "online_count": 0, @@ -255,7 +259,7 @@ class EnhancedIntervalStatsManager: @staticmethod async def add_user_to_interval(user_id: int, is_playing: bool = False) -> None: - """添加用户到当前区间统计""" + """添加用户到当前区间统计 - 实时更新当前运行的区间""" redis_sync = get_redis_message() redis_async = get_redis() @@ -273,15 +277,17 @@ class EnhancedIntervalStatsManager: await _redis_exec(redis_sync.sadd, playing_key, str(user_id)) await redis_async.expire(playing_key, 35 * 60) - # 异步更新区间统计 - asyncio.create_task(EnhancedIntervalStatsManager._update_interval_stats()) + # 立即更新区间统计(同步更新,确保数据实时性) + await EnhancedIntervalStatsManager._update_interval_stats() + + logger.debug(f"Added user {user_id} to current interval {current_interval.start_time.strftime('%H:%M')}-{current_interval.end_time.strftime('%H:%M')}") except Exception as e: logger.error(f"Error adding user {user_id} to interval: {e}") @staticmethod async def _update_interval_stats() -> None: - """更新当前区间统计(内部方法)""" + """更新当前区间统计 - 立即同步更新""" redis_sync = get_redis_message() redis_async = get_redis() @@ -325,7 +331,7 @@ class EnhancedIntervalStatsManager: stats.unique_online_users = unique_online stats.unique_playing_users = unique_playing - # 保存更新的统计数据 + # 立即保存更新的统计数据 await _redis_exec( redis_sync.set, current_interval.interval_key, @@ -333,6 +339,8 @@ class EnhancedIntervalStatsManager: ) await redis_async.expire(current_interval.interval_key, 35 * 60) + logger.debug(f"Updated interval stats: online={unique_online}, playing={unique_playing}, peak_online={stats.peak_online_count}, peak_playing={stats.peak_playing_count}") + except Exception as e: logger.error(f"Error updating interval stats: {e}") @@ -356,9 +364,9 @@ class EnhancedIntervalStatsManager: stats = IntervalStats.from_dict(json.loads(stats_data)) - # 创建历史记录点(使用独特用户数作为主要统计) + # 创建历史记录点(使用区间结束时间作为时间戳,确保时间对齐) history_point = { - "timestamp": stats.end_time.isoformat(), + "timestamp": current_interval.end_time.isoformat(), "online_count": stats.unique_online_users, "playing_count": stats.unique_playing_users, "peak_online": stats.peak_online_count, diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py index 10faabe..61c1210 100644 --- a/app/service/stats_scheduler.py +++ b/app/service/stats_scheduler.py @@ -58,15 +58,22 @@ class StatsScheduler: try: # 计算下次记录时间(下个30分钟整点) now = datetime.utcnow() - minutes_until_next = 30 - (now.minute % 30) - next_record_time = now.replace(second=0, microsecond=0) + timedelta(minutes=minutes_until_next) - # 计算需要等待的秒数 - sleep_seconds = (next_record_time - now).total_seconds() + # 计算当前区间边界 + current_minute = (now.minute // 30) * 30 + current_interval_end = now.replace(minute=current_minute, second=0, microsecond=0) + timedelta(minutes=30) - # 确保至少等待30分钟,但不超过31分钟(防止时间漂移) - sleep_seconds = max(min(sleep_seconds, 31 * 60), 30 * 60) + # 如果已经过了当前区间结束时间,立即处理 + if now >= current_interval_end: + current_interval_end += timedelta(minutes=30) + # 计算需要等待的时间(到下个区间结束) + sleep_seconds = (current_interval_end - now).total_seconds() + + # 确保至少等待1分钟,最多等待31分钟 + sleep_seconds = max(min(sleep_seconds, 31 * 60), 60) + + logger.debug(f"Next interval finalization in {sleep_seconds/60:.1f} minutes at {current_interval_end.strftime('%H:%M:%S')}") await asyncio.sleep(sleep_seconds) if not self._running: