修复数据统计问题
This commit is contained in:
@@ -40,6 +40,9 @@ class OnlineHistoryPoint(BaseModel):
|
||||
timestamp: datetime
|
||||
online_count: int
|
||||
playing_count: int
|
||||
peak_online: int | None = None # 峰值在线数(增强数据)
|
||||
peak_playing: int | None = None # 峰值游玩数(增强数据)
|
||||
total_samples: int | None = None # 采样次数(增强数据)
|
||||
|
||||
class OnlineHistoryResponse(BaseModel):
|
||||
"""24小时在线历史响应模型"""
|
||||
@@ -87,8 +90,6 @@ async def get_online_history() -> OnlineHistoryResponse:
|
||||
返回过去24小时内每小时的在线用户数和游玩用户数统计,
|
||||
包含当前实时数据作为最新数据点
|
||||
"""
|
||||
redis = get_redis()
|
||||
|
||||
try:
|
||||
# 获取历史数据 - 使用同步Redis客户端
|
||||
redis_sync = get_redis_message()
|
||||
@@ -103,7 +104,10 @@ async def get_online_history() -> OnlineHistoryResponse:
|
||||
history_points.append(OnlineHistoryPoint(
|
||||
timestamp=datetime.fromisoformat(point_data["timestamp"]),
|
||||
online_count=point_data["online_count"],
|
||||
playing_count=point_data["playing_count"]
|
||||
playing_count=point_data["playing_count"],
|
||||
peak_online=point_data.get("peak_online"), # 新字段,可能不存在
|
||||
peak_playing=point_data.get("peak_playing"), # 新字段,可能不存在
|
||||
total_samples=point_data.get("total_samples") # 新字段,可能不存在
|
||||
))
|
||||
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
||||
logger.warning(f"Invalid history data point: {data}, error: {e}")
|
||||
@@ -112,19 +116,19 @@ async def get_online_history() -> OnlineHistoryResponse:
|
||||
# 获取当前实时统计信息
|
||||
current_stats = await get_server_stats()
|
||||
|
||||
# 将当前实时数据作为最新的数据点添加到历史中(如果需要)
|
||||
current_point = OnlineHistoryPoint(
|
||||
timestamp=current_stats.timestamp,
|
||||
online_count=current_stats.online_users,
|
||||
playing_count=current_stats.playing_users
|
||||
)
|
||||
|
||||
# 如果历史数据为空或者最新数据超过15分钟,添加当前数据点
|
||||
if not history_points or (
|
||||
history_points and
|
||||
(current_stats.timestamp - max(history_points, key=lambda x: x.timestamp).timestamp).total_seconds() > 15 * 60
|
||||
):
|
||||
history_points.append(current_point)
|
||||
history_points.append(OnlineHistoryPoint(
|
||||
timestamp=current_stats.timestamp,
|
||||
online_count=current_stats.online_users,
|
||||
playing_count=current_stats.playing_users,
|
||||
peak_online=current_stats.online_users, # 当前实时数据作为峰值
|
||||
peak_playing=current_stats.playing_users,
|
||||
total_samples=1
|
||||
))
|
||||
|
||||
# 按时间排序(最新的在前)
|
||||
history_points.sort(key=lambda x: x.timestamp, reverse=True)
|
||||
@@ -145,6 +149,7 @@ async def get_online_history() -> OnlineHistoryResponse:
|
||||
current_stats=current_stats
|
||||
)
|
||||
|
||||
|
||||
async def _get_registered_users_count(redis) -> int:
|
||||
"""获取注册用户总数(从缓存)"""
|
||||
try:
|
||||
|
||||
@@ -199,12 +199,16 @@ class EnhancedIntervalStatsManager:
|
||||
current_time = datetime.utcnow()
|
||||
current_interval_start, _ = EnhancedIntervalStatsManager.get_current_interval_boundaries()
|
||||
|
||||
# 从当前区间开始往前推,创建历史数据点
|
||||
# 从当前区间开始往前推,创建历史数据点(确保时间对齐到30分钟边界)
|
||||
fill_points = []
|
||||
for i in range(needed_points):
|
||||
# 每次往前推30分钟
|
||||
# 每次往前推30分钟,确保时间对齐
|
||||
point_time = current_interval_start - timedelta(minutes=30 * (i + 1))
|
||||
|
||||
# 确保时间对齐到30分钟边界
|
||||
aligned_minute = (point_time.minute // 30) * 30
|
||||
point_time = point_time.replace(minute=aligned_minute, second=0, microsecond=0)
|
||||
|
||||
history_point = {
|
||||
"timestamp": point_time.isoformat(),
|
||||
"online_count": 0,
|
||||
@@ -255,7 +259,7 @@ class EnhancedIntervalStatsManager:
|
||||
|
||||
@staticmethod
|
||||
async def add_user_to_interval(user_id: int, is_playing: bool = False) -> None:
|
||||
"""添加用户到当前区间统计"""
|
||||
"""添加用户到当前区间统计 - 实时更新当前运行的区间"""
|
||||
redis_sync = get_redis_message()
|
||||
redis_async = get_redis()
|
||||
|
||||
@@ -273,15 +277,17 @@ class EnhancedIntervalStatsManager:
|
||||
await _redis_exec(redis_sync.sadd, playing_key, str(user_id))
|
||||
await redis_async.expire(playing_key, 35 * 60)
|
||||
|
||||
# 异步更新区间统计
|
||||
asyncio.create_task(EnhancedIntervalStatsManager._update_interval_stats())
|
||||
# 立即更新区间统计(同步更新,确保数据实时性)
|
||||
await EnhancedIntervalStatsManager._update_interval_stats()
|
||||
|
||||
logger.debug(f"Added user {user_id} to current interval {current_interval.start_time.strftime('%H:%M')}-{current_interval.end_time.strftime('%H:%M')}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding user {user_id} to interval: {e}")
|
||||
|
||||
@staticmethod
|
||||
async def _update_interval_stats() -> None:
|
||||
"""更新当前区间统计(内部方法)"""
|
||||
"""更新当前区间统计 - 立即同步更新"""
|
||||
redis_sync = get_redis_message()
|
||||
redis_async = get_redis()
|
||||
|
||||
@@ -325,7 +331,7 @@ class EnhancedIntervalStatsManager:
|
||||
stats.unique_online_users = unique_online
|
||||
stats.unique_playing_users = unique_playing
|
||||
|
||||
# 保存更新的统计数据
|
||||
# 立即保存更新的统计数据
|
||||
await _redis_exec(
|
||||
redis_sync.set,
|
||||
current_interval.interval_key,
|
||||
@@ -333,6 +339,8 @@ class EnhancedIntervalStatsManager:
|
||||
)
|
||||
await redis_async.expire(current_interval.interval_key, 35 * 60)
|
||||
|
||||
logger.debug(f"Updated interval stats: online={unique_online}, playing={unique_playing}, peak_online={stats.peak_online_count}, peak_playing={stats.peak_playing_count}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating interval stats: {e}")
|
||||
|
||||
@@ -356,9 +364,9 @@ class EnhancedIntervalStatsManager:
|
||||
|
||||
stats = IntervalStats.from_dict(json.loads(stats_data))
|
||||
|
||||
# 创建历史记录点(使用独特用户数作为主要统计)
|
||||
# 创建历史记录点(使用区间结束时间作为时间戳,确保时间对齐)
|
||||
history_point = {
|
||||
"timestamp": stats.end_time.isoformat(),
|
||||
"timestamp": current_interval.end_time.isoformat(),
|
||||
"online_count": stats.unique_online_users,
|
||||
"playing_count": stats.unique_playing_users,
|
||||
"peak_online": stats.peak_online_count,
|
||||
|
||||
@@ -58,15 +58,22 @@ class StatsScheduler:
|
||||
try:
|
||||
# 计算下次记录时间(下个30分钟整点)
|
||||
now = datetime.utcnow()
|
||||
minutes_until_next = 30 - (now.minute % 30)
|
||||
next_record_time = now.replace(second=0, microsecond=0) + timedelta(minutes=minutes_until_next)
|
||||
|
||||
# 计算需要等待的秒数
|
||||
sleep_seconds = (next_record_time - now).total_seconds()
|
||||
# 计算当前区间边界
|
||||
current_minute = (now.minute // 30) * 30
|
||||
current_interval_end = now.replace(minute=current_minute, second=0, microsecond=0) + timedelta(minutes=30)
|
||||
|
||||
# 确保至少等待30分钟,但不超过31分钟(防止时间漂移)
|
||||
sleep_seconds = max(min(sleep_seconds, 31 * 60), 30 * 60)
|
||||
# 如果已经过了当前区间结束时间,立即处理
|
||||
if now >= current_interval_end:
|
||||
current_interval_end += timedelta(minutes=30)
|
||||
|
||||
# 计算需要等待的时间(到下个区间结束)
|
||||
sleep_seconds = (current_interval_end - now).total_seconds()
|
||||
|
||||
# 确保至少等待1分钟,最多等待31分钟
|
||||
sleep_seconds = max(min(sleep_seconds, 31 * 60), 60)
|
||||
|
||||
logger.debug(f"Next interval finalization in {sleep_seconds/60:.1f} minutes at {current_interval_end.strftime('%H:%M:%S')}")
|
||||
await asyncio.sleep(sleep_seconds)
|
||||
|
||||
if not self._running:
|
||||
|
||||
Reference in New Issue
Block a user