修复区间问题
This commit is contained in:
@@ -40,9 +40,6 @@ class OnlineHistoryPoint(BaseModel):
|
|||||||
timestamp: datetime
|
timestamp: datetime
|
||||||
online_count: int
|
online_count: int
|
||||||
playing_count: int
|
playing_count: int
|
||||||
peak_online: int | None = None # 峰值在线数(增强数据)
|
|
||||||
peak_playing: int | None = None # 峰值游玩数(增强数据)
|
|
||||||
total_samples: int | None = None # 采样次数(增强数据)
|
|
||||||
|
|
||||||
class OnlineHistoryResponse(BaseModel):
|
class OnlineHistoryResponse(BaseModel):
|
||||||
"""24小时在线历史响应模型"""
|
"""24小时在线历史响应模型"""
|
||||||
@@ -100,14 +97,11 @@ async def get_online_history() -> OnlineHistoryResponse:
|
|||||||
for data in history_data:
|
for data in history_data:
|
||||||
try:
|
try:
|
||||||
point_data = json.loads(data)
|
point_data = json.loads(data)
|
||||||
# 支持新旧格式的历史数据
|
# 只保留基本字段
|
||||||
history_points.append(OnlineHistoryPoint(
|
history_points.append(OnlineHistoryPoint(
|
||||||
timestamp=datetime.fromisoformat(point_data["timestamp"]),
|
timestamp=datetime.fromisoformat(point_data["timestamp"]),
|
||||||
online_count=point_data["online_count"],
|
online_count=point_data["online_count"],
|
||||||
playing_count=point_data["playing_count"],
|
playing_count=point_data["playing_count"]
|
||||||
peak_online=point_data.get("peak_online"), # 新字段,可能不存在
|
|
||||||
peak_playing=point_data.get("peak_playing"), # 新字段,可能不存在
|
|
||||||
total_samples=point_data.get("total_samples") # 新字段,可能不存在
|
|
||||||
))
|
))
|
||||||
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
||||||
logger.warning(f"Invalid history data point: {data}, error: {e}")
|
logger.warning(f"Invalid history data point: {data}, error: {e}")
|
||||||
@@ -124,10 +118,7 @@ async def get_online_history() -> OnlineHistoryResponse:
|
|||||||
history_points.append(OnlineHistoryPoint(
|
history_points.append(OnlineHistoryPoint(
|
||||||
timestamp=current_stats.timestamp,
|
timestamp=current_stats.timestamp,
|
||||||
online_count=current_stats.online_users,
|
online_count=current_stats.online_users,
|
||||||
playing_count=current_stats.playing_users,
|
playing_count=current_stats.playing_users
|
||||||
peak_online=current_stats.online_users, # 当前实时数据作为峰值
|
|
||||||
peak_playing=current_stats.playing_users,
|
|
||||||
total_samples=1
|
|
||||||
))
|
))
|
||||||
|
|
||||||
# 按时间排序(最新的在前)
|
# 按时间排序(最新的在前)
|
||||||
@@ -325,10 +316,7 @@ async def record_hourly_stats() -> None:
|
|||||||
history_point = {
|
history_point = {
|
||||||
"timestamp": current_time.isoformat(),
|
"timestamp": current_time.isoformat(),
|
||||||
"online_count": online_count,
|
"online_count": online_count,
|
||||||
"playing_count": playing_count,
|
"playing_count": playing_count
|
||||||
"peak_online": online_count,
|
|
||||||
"peak_playing": playing_count,
|
|
||||||
"total_samples": 1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# 添加到历史记录
|
# 添加到历史记录
|
||||||
|
|||||||
@@ -228,10 +228,7 @@ class EnhancedIntervalStatsManager:
|
|||||||
history_point = {
|
history_point = {
|
||||||
"timestamp": point_time.isoformat(),
|
"timestamp": point_time.isoformat(),
|
||||||
"online_count": 0,
|
"online_count": 0,
|
||||||
"playing_count": 0,
|
"playing_count": 0
|
||||||
"peak_online": 0,
|
|
||||||
"peak_playing": 0,
|
|
||||||
"total_samples": 0,
|
|
||||||
}
|
}
|
||||||
fill_points.append(json.dumps(history_point))
|
fill_points.append(json.dumps(history_point))
|
||||||
|
|
||||||
@@ -388,36 +385,40 @@ class EnhancedIntervalStatsManager:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def finalize_interval() -> IntervalStats | None:
|
async def finalize_interval() -> IntervalStats | None:
|
||||||
"""完成当前区间统计并保存到历史"""
|
"""完成上一个已结束的区间统计并保存到历史"""
|
||||||
redis_sync = get_redis_message()
|
redis_sync = get_redis_message()
|
||||||
redis_async = get_redis()
|
redis_async = get_redis()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
current_interval = (
|
# 获取上一个已完成区间(当前区间的前一个)
|
||||||
await EnhancedIntervalStatsManager.get_current_interval_info()
|
current_start, current_end = EnhancedIntervalStatsManager.get_current_interval_boundaries()
|
||||||
|
# 上一个区间开始时间是当前区间开始时间减去30分钟
|
||||||
|
previous_start = current_start - timedelta(minutes=30)
|
||||||
|
previous_end = current_start # 上一个区间的结束时间就是当前区间的开始时间
|
||||||
|
|
||||||
|
interval_key = EnhancedIntervalStatsManager.generate_interval_key(previous_start)
|
||||||
|
|
||||||
|
previous_interval = IntervalInfo(
|
||||||
|
start_time=previous_start,
|
||||||
|
end_time=previous_end,
|
||||||
|
interval_key=interval_key
|
||||||
)
|
)
|
||||||
|
|
||||||
# 最后一次更新统计
|
|
||||||
await EnhancedIntervalStatsManager._update_interval_stats()
|
|
||||||
|
|
||||||
# 获取最终统计数据
|
# 获取最终统计数据
|
||||||
stats_data = await _redis_exec(
|
stats_data = await _redis_exec(
|
||||||
redis_sync.get, current_interval.interval_key
|
redis_sync.get, previous_interval.interval_key
|
||||||
)
|
)
|
||||||
if not stats_data:
|
if not stats_data:
|
||||||
logger.warning("No interval stats found to finalize")
|
logger.warning(f"No interval stats found to finalize for {previous_interval.start_time.strftime('%H:%M')}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
stats = IntervalStats.from_dict(json.loads(stats_data))
|
stats = IntervalStats.from_dict(json.loads(stats_data))
|
||||||
|
|
||||||
# 创建历史记录点(使用区间结束时间作为时间戳,确保时间对齐)
|
# 创建历史记录点(使用区间开始时间作为时间戳)
|
||||||
history_point = {
|
history_point = {
|
||||||
"timestamp": current_interval.end_time.isoformat(),
|
"timestamp": previous_interval.start_time.isoformat(),
|
||||||
"online_count": stats.unique_online_users,
|
"online_count": stats.unique_online_users,
|
||||||
"playing_count": stats.unique_playing_users,
|
"playing_count": stats.unique_playing_users
|
||||||
"peak_online": stats.peak_online_count,
|
|
||||||
"peak_playing": stats.peak_playing_count,
|
|
||||||
"total_samples": stats.total_samples,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# 添加到历史记录
|
# 添加到历史记录
|
||||||
|
|||||||
@@ -59,25 +59,26 @@ class StatsScheduler:
|
|||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
# 计算下次记录时间(下个30分钟整点)
|
# 计算下次区间结束时间
|
||||||
now = datetime.utcnow()
|
now = datetime.utcnow()
|
||||||
|
|
||||||
# 计算当前区间边界
|
# 计算当前区间的结束时间
|
||||||
current_minute = (now.minute // 30) * 30
|
current_minute = (now.minute // 30) * 30
|
||||||
current_interval_end = now.replace(
|
current_interval_end = now.replace(minute=current_minute, second=0, microsecond=0) + timedelta(minutes=30)
|
||||||
minute=current_minute, second=0, microsecond=0
|
|
||||||
) + timedelta(minutes=30)
|
# 如果当前时间已经超过了当前区间结束时间,说明需要等待下一个区间结束
|
||||||
|
|
||||||
# 如果已经过了当前区间结束时间,立即处理
|
|
||||||
if now >= current_interval_end:
|
if now >= current_interval_end:
|
||||||
current_interval_end += timedelta(minutes=30)
|
current_interval_end += timedelta(minutes=30)
|
||||||
|
|
||||||
# 计算需要等待的时间(到下个区间结束)
|
# 计算需要等待的时间
|
||||||
sleep_seconds = (current_interval_end - now).total_seconds()
|
sleep_seconds = (current_interval_end - now).total_seconds()
|
||||||
|
|
||||||
# 确保至少等待1分钟,最多等待31分钟
|
# 添加小的缓冲时间,确保区间真正结束后再处理
|
||||||
sleep_seconds = max(min(sleep_seconds, 31 * 60), 60)
|
sleep_seconds += 10 # 额外等待10秒
|
||||||
|
|
||||||
|
# 限制等待时间范围
|
||||||
|
sleep_seconds = max(min(sleep_seconds, 32 * 60), 10)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Next interval finalization in {sleep_seconds / 60:.1f} minutes at {current_interval_end.strftime('%H:%M:%S')}"
|
f"Next interval finalization in {sleep_seconds / 60:.1f} minutes at {current_interval_end.strftime('%H:%M:%S')}"
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user