From 845aab4aedf98fc0ead431a7393dbafc1bd71cc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 06:11:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8C=BA=E9=97=B4=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/router/v2/stats.py | 20 +++----------- app/service/enhanced_interval_stats.py | 37 +++++++++++++------------- app/service/stats_scheduler.py | 29 ++++++++++---------- 3 files changed, 38 insertions(+), 48 deletions(-) diff --git a/app/router/v2/stats.py b/app/router/v2/stats.py index 669e924..c0c83dd 100644 --- a/app/router/v2/stats.py +++ b/app/router/v2/stats.py @@ -40,9 +40,6 @@ class OnlineHistoryPoint(BaseModel): timestamp: datetime online_count: int playing_count: int - peak_online: int | None = None # 峰值在线数(增强数据) - peak_playing: int | None = None # 峰值游玩数(增强数据) - total_samples: int | None = None # 采样次数(增强数据) class OnlineHistoryResponse(BaseModel): """24小时在线历史响应模型""" @@ -100,14 +97,11 @@ async def get_online_history() -> OnlineHistoryResponse: for data in history_data: try: point_data = json.loads(data) - # 支持新旧格式的历史数据 + # 只保留基本字段 history_points.append(OnlineHistoryPoint( timestamp=datetime.fromisoformat(point_data["timestamp"]), online_count=point_data["online_count"], - playing_count=point_data["playing_count"], - peak_online=point_data.get("peak_online"), # 新字段,可能不存在 - peak_playing=point_data.get("peak_playing"), # 新字段,可能不存在 - total_samples=point_data.get("total_samples") # 新字段,可能不存在 + playing_count=point_data["playing_count"] )) except (json.JSONDecodeError, KeyError, ValueError) as e: logger.warning(f"Invalid history data point: {data}, error: {e}") @@ -124,10 +118,7 @@ async def get_online_history() -> OnlineHistoryResponse: history_points.append(OnlineHistoryPoint( timestamp=current_stats.timestamp, online_count=current_stats.online_users, - playing_count=current_stats.playing_users, - peak_online=current_stats.online_users, # 当前实时数据作为峰值 - peak_playing=current_stats.playing_users, - total_samples=1 + playing_count=current_stats.playing_users )) # 按时间排序(最新的在前) @@ -325,10 +316,7 @@ async def record_hourly_stats() -> None: history_point = { "timestamp": current_time.isoformat(), "online_count": online_count, - "playing_count": playing_count, - "peak_online": online_count, - "peak_playing": playing_count, - "total_samples": 1 + "playing_count": playing_count } # 添加到历史记录 diff --git a/app/service/enhanced_interval_stats.py b/app/service/enhanced_interval_stats.py index 60ac4bc..671ffb9 100644 --- a/app/service/enhanced_interval_stats.py +++ b/app/service/enhanced_interval_stats.py @@ -228,10 +228,7 @@ class EnhancedIntervalStatsManager: history_point = { "timestamp": point_time.isoformat(), "online_count": 0, - "playing_count": 0, - "peak_online": 0, - "peak_playing": 0, - "total_samples": 0, + "playing_count": 0 } fill_points.append(json.dumps(history_point)) @@ -388,36 +385,40 @@ class EnhancedIntervalStatsManager: @staticmethod async def finalize_interval() -> IntervalStats | None: - """完成当前区间统计并保存到历史""" + """完成上一个已结束的区间统计并保存到历史""" redis_sync = get_redis_message() redis_async = get_redis() try: - current_interval = ( - await EnhancedIntervalStatsManager.get_current_interval_info() + # 获取上一个已完成区间(当前区间的前一个) + current_start, current_end = EnhancedIntervalStatsManager.get_current_interval_boundaries() + # 上一个区间开始时间是当前区间开始时间减去30分钟 + previous_start = current_start - timedelta(minutes=30) + previous_end = current_start # 上一个区间的结束时间就是当前区间的开始时间 + + interval_key = EnhancedIntervalStatsManager.generate_interval_key(previous_start) + + previous_interval = IntervalInfo( + start_time=previous_start, + end_time=previous_end, + interval_key=interval_key ) - # 最后一次更新统计 - await EnhancedIntervalStatsManager._update_interval_stats() - # 获取最终统计数据 stats_data = await _redis_exec( - redis_sync.get, current_interval.interval_key + redis_sync.get, previous_interval.interval_key ) if not stats_data: - logger.warning("No interval stats found to finalize") + logger.warning(f"No interval stats found to finalize for {previous_interval.start_time.strftime('%H:%M')}") return None stats = IntervalStats.from_dict(json.loads(stats_data)) - # 创建历史记录点(使用区间结束时间作为时间戳,确保时间对齐) + # 创建历史记录点(使用区间开始时间作为时间戳) history_point = { - "timestamp": current_interval.end_time.isoformat(), + "timestamp": previous_interval.start_time.isoformat(), "online_count": stats.unique_online_users, - "playing_count": stats.unique_playing_users, - "peak_online": stats.peak_online_count, - "peak_playing": stats.peak_playing_count, - "total_samples": stats.total_samples, + "playing_count": stats.unique_playing_users } # 添加到历史记录 diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py index 02adc73..cef88d1 100644 --- a/app/service/stats_scheduler.py +++ b/app/service/stats_scheduler.py @@ -59,25 +59,26 @@ class StatsScheduler: while self._running: try: - # 计算下次记录时间(下个30分钟整点) + # 计算下次区间结束时间 now = datetime.utcnow() - - # 计算当前区间边界 + + # 计算当前区间的结束时间 current_minute = (now.minute // 30) * 30 - current_interval_end = now.replace( - minute=current_minute, second=0, microsecond=0 - ) + timedelta(minutes=30) - - # 如果已经过了当前区间结束时间,立即处理 + current_interval_end = now.replace(minute=current_minute, second=0, microsecond=0) + timedelta(minutes=30) + + # 如果当前时间已经超过了当前区间结束时间,说明需要等待下一个区间结束 if now >= current_interval_end: current_interval_end += timedelta(minutes=30) - - # 计算需要等待的时间(到下个区间结束) + + # 计算需要等待的时间 sleep_seconds = (current_interval_end - now).total_seconds() - - # 确保至少等待1分钟,最多等待31分钟 - sleep_seconds = max(min(sleep_seconds, 31 * 60), 60) - + + # 添加小的缓冲时间,确保区间真正结束后再处理 + sleep_seconds += 10 # 额外等待10秒 + + # 限制等待时间范围 + sleep_seconds = max(min(sleep_seconds, 32 * 60), 10) + logger.debug( f"Next interval finalization in {sleep_seconds / 60:.1f} minutes at {current_interval_end.strftime('%H:%M:%S')}" )