diff --git a/app/models/stats.py b/app/models/stats.py new file mode 100644 index 0000000..4e277b1 --- /dev/null +++ b/app/models/stats.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel + + +class OnlineStats(BaseModel): + """在线统计信息""" + registered_users: int + online_users: int + playing_users: int + timestamp: datetime + + +class OnlineHistoryPoint(BaseModel): + """在线历史数据点""" + timestamp: datetime + online_count: int + playing_count: int + + +class OnlineHistoryStats(BaseModel): + """24小时在线历史统计""" + history: list[OnlineHistoryPoint] + current_stats: OnlineStats + + +class ServerStatistics(BaseModel): + """服务器统计信息""" + total_users: int + online_users: int + playing_users: int + spectating_users: int + multiplayer_users: int + last_updated: datetime diff --git a/app/router/v2/router.py b/app/router/v2/router.py index 0bd0b4b..ffd22c4 100644 --- a/app/router/v2/router.py +++ b/app/router/v2/router.py @@ -3,3 +3,6 @@ from __future__ import annotations from fastapi import APIRouter router = APIRouter(prefix="/api/v2") + +# 导入所有子路由模块来注册路由 +from . import stats # 统计路由 diff --git a/app/router/v2/stats.py b/app/router/v2/stats.py new file mode 100644 index 0000000..7c2a774 --- /dev/null +++ b/app/router/v2/stats.py @@ -0,0 +1,237 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta +import json +from typing import Any +from concurrent.futures import ThreadPoolExecutor + +from app.dependencies.database import get_redis, get_redis_message +from app.log import logger + +from .router import router + +from fastapi import APIRouter +from pydantic import BaseModel + +# Redis key constants +REDIS_ONLINE_USERS_KEY = "server:online_users" +REDIS_PLAYING_USERS_KEY = "server:playing_users" +REDIS_REGISTERED_USERS_KEY = "server:registered_users" +REDIS_ONLINE_HISTORY_KEY = "server:online_history" + +# 线程池用于同步Redis操作 +_executor = ThreadPoolExecutor(max_workers=2) + +async def _redis_exec(func, *args, **kwargs): + """在线程池中执行同步Redis操作""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(_executor, func, *args, **kwargs) + +class ServerStats(BaseModel): + """服务器统计信息响应模型""" + registered_users: int + online_users: int + playing_users: int + timestamp: datetime + +class OnlineHistoryPoint(BaseModel): + """在线历史数据点""" + timestamp: datetime + online_count: int + playing_count: int + +class OnlineHistoryResponse(BaseModel): + """24小时在线历史响应模型""" + history: list[OnlineHistoryPoint] + current_stats: ServerStats + +@router.get("/stats", response_model=ServerStats, tags=["统计"]) +async def get_server_stats() -> ServerStats: + """ + 获取服务器实时统计信息 + + 返回服务器注册用户数、在线用户数、正在游玩用户数等实时统计信息 + """ + redis = get_redis() + + try: + # 并行获取所有统计数据 + registered_count, online_count, playing_count = await asyncio.gather( + _get_registered_users_count(redis), + _get_online_users_count(redis), + _get_playing_users_count(redis) + ) + + return ServerStats( + registered_users=registered_count, + online_users=online_count, + playing_users=playing_count, + timestamp=datetime.utcnow() + ) + except Exception as e: + logger.error(f"Error getting server stats: {e}") + # 返回默认值 + return ServerStats( + registered_users=0, + online_users=0, + playing_users=0, + timestamp=datetime.utcnow() + ) + +@router.get("/stats/history", response_model=OnlineHistoryResponse, tags=["统计"]) +async def get_online_history() -> OnlineHistoryResponse: + """ + 获取最近24小时在线统计历史 + + 返回过去24小时内每小时的在线用户数和游玩用户数统计 + """ + redis = get_redis() + + try: + # 获取历史数据 - 使用同步Redis客户端 + redis_sync = get_redis_message() + history_data = await _redis_exec(redis_sync.lrange, REDIS_ONLINE_HISTORY_KEY, 0, -1) + history_points = [] + + # 处理历史数据 + for data in history_data: + try: + point_data = json.loads(data) + history_points.append(OnlineHistoryPoint( + timestamp=datetime.fromisoformat(point_data["timestamp"]), + online_count=point_data["online_count"], + playing_count=point_data["playing_count"] + )) + except (json.JSONDecodeError, KeyError, ValueError) as e: + logger.warning(f"Invalid history data point: {data}, error: {e}") + continue + + # 按时间排序(最新的在前) + history_points.sort(key=lambda x: x.timestamp, reverse=True) + + # 获取当前统计信息 + current_stats = await get_server_stats() + + return OnlineHistoryResponse( + history=history_points, + current_stats=current_stats + ) + except Exception as e: + logger.error(f"Error getting online history: {e}") + # 返回空历史和当前状态 + current_stats = await get_server_stats() + return OnlineHistoryResponse( + history=[], + current_stats=current_stats + ) + +async def _get_registered_users_count(redis) -> int: + """获取注册用户总数(从缓存)""" + try: + count = await redis.get(REDIS_REGISTERED_USERS_KEY) + return int(count) if count else 0 + except Exception as e: + logger.error(f"Error getting registered users count: {e}") + return 0 + +async def _get_online_users_count(redis) -> int: + """获取当前在线用户数""" + try: + count = await redis.scard(REDIS_ONLINE_USERS_KEY) + return count + except Exception as e: + logger.error(f"Error getting online users count: {e}") + return 0 + +async def _get_playing_users_count(redis) -> int: + """获取当前游玩用户数""" + try: + count = await redis.scard(REDIS_PLAYING_USERS_KEY) + return count + except Exception as e: + logger.error(f"Error getting playing users count: {e}") + return 0 + +# 统计更新功能 +async def update_registered_users_count() -> None: + """更新注册用户数缓存""" + from app.dependencies.database import with_db + from app.database import User + from app.const import BANCHOBOT_ID + from sqlmodel import select, func + + redis = get_redis() + try: + async with with_db() as db: + # 排除机器人用户(BANCHOBOT_ID) + result = await db.exec( + select(func.count()).select_from(User).where(User.id != BANCHOBOT_ID) + ) + count = result.first() + await redis.set(REDIS_REGISTERED_USERS_KEY, count or 0, ex=300) # 5分钟过期 + logger.debug(f"Updated registered users count: {count}") + except Exception as e: + logger.error(f"Error updating registered users count: {e}") + +async def add_online_user(user_id: int) -> None: + """添加在线用户""" + redis_sync = get_redis_message() + redis_async = get_redis() + try: + await _redis_exec(redis_sync.sadd, REDIS_ONLINE_USERS_KEY, str(user_id)) + await redis_async.expire(REDIS_ONLINE_USERS_KEY, 3600) # 1小时过期 + except Exception as e: + logger.error(f"Error adding online user {user_id}: {e}") + +async def remove_online_user(user_id: int) -> None: + """移除在线用户""" + redis_sync = get_redis_message() + try: + await _redis_exec(redis_sync.srem, REDIS_ONLINE_USERS_KEY, str(user_id)) + await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id)) + except Exception as e: + logger.error(f"Error removing online user {user_id}: {e}") + +async def add_playing_user(user_id: int) -> None: + """添加游玩用户""" + redis_sync = get_redis_message() + redis_async = get_redis() + try: + await _redis_exec(redis_sync.sadd, REDIS_PLAYING_USERS_KEY, str(user_id)) + await redis_async.expire(REDIS_PLAYING_USERS_KEY, 3600) # 1小时过期 + except Exception as e: + logger.error(f"Error adding playing user {user_id}: {e}") + +async def remove_playing_user(user_id: int) -> None: + """移除游玩用户""" + redis_sync = get_redis_message() + try: + await _redis_exec(redis_sync.srem, REDIS_PLAYING_USERS_KEY, str(user_id)) + except Exception as e: + logger.error(f"Error removing playing user {user_id}: {e}") + +async def record_hourly_stats() -> None: + """记录每小时统计数据""" + redis_sync = get_redis_message() + redis_async = get_redis() + try: + online_count = await _get_online_users_count(redis_async) + playing_count = await _get_playing_users_count(redis_async) + + history_point = { + "timestamp": datetime.utcnow().isoformat(), + "online_count": online_count, + "playing_count": playing_count + } + + # 添加到历史记录 + await _redis_exec(redis_sync.lpush, REDIS_ONLINE_HISTORY_KEY, json.dumps(history_point)) + # 只保留48个数据点 + await _redis_exec(redis_sync.ltrim, REDIS_ONLINE_HISTORY_KEY, 0, 47) + # 设置过期时间为25小时 + await redis_async.expire(REDIS_ONLINE_HISTORY_KEY, 25 * 3600) + + logger.debug(f"Recorded hourly stats: online={online_count}, playing={playing_count}") + except Exception as e: + logger.error(f"Error recording hourly stats: {e}") diff --git a/app/service/stats_scheduler.py b/app/service/stats_scheduler.py new file mode 100644 index 0000000..fef3081 --- /dev/null +++ b/app/service/stats_scheduler.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime + +from app.log import logger +from app.router.v2.stats import record_hourly_stats, update_registered_users_count + + +class StatsScheduler: + """统计数据调度器""" + + def __init__(self): + self._running = False + self._stats_task: asyncio.Task | None = None + self._registered_task: asyncio.Task | None = None + + def start(self) -> None: + """启动调度器""" + if self._running: + return + + self._running = True + self._stats_task = asyncio.create_task(self._stats_loop()) + self._registered_task = asyncio.create_task(self._registered_users_loop()) + logger.info("Stats scheduler started") + + def stop(self) -> None: + """停止调度器""" + if not self._running: + return + + self._running = False + + if self._stats_task: + self._stats_task.cancel() + if self._registered_task: + self._registered_task.cancel() + + logger.info("Stats scheduler stopped") + + async def _stats_loop(self) -> None: + """统计数据记录循环 - 每30分钟记录一次""" + while self._running: + try: + await record_hourly_stats() + logger.debug("Recorded hourly statistics") + except Exception as e: + logger.error(f"Error in stats loop: {e}") + + # 等待30分钟 + await asyncio.sleep(30 * 60) + + async def _registered_users_loop(self) -> None: + """注册用户数更新循环 - 每5分钟更新一次""" + while self._running: + try: + await update_registered_users_count() + logger.debug("Updated registered users count") + except Exception as e: + logger.error(f"Error in registered users loop: {e}") + + # 等待5分钟 + await asyncio.sleep(5 * 60) + + +# 全局调度器实例 +stats_scheduler = StatsScheduler() + + +def start_stats_scheduler() -> None: + """启动统计调度器""" + stats_scheduler.start() + + +def stop_stats_scheduler() -> None: + """停止统计调度器""" + stats_scheduler.stop() diff --git a/app/signalr/hub/metadata.py b/app/signalr/hub/metadata.py index a81c1c8..8efea4f 100644 --- a/app/signalr/hub/metadata.py +++ b/app/signalr/hub/metadata.py @@ -91,8 +91,14 @@ class MetadataHub(Hub[MetadataClientState]): @override async def _clean_state(self, state: MetadataClientState) -> None: + user_id = int(state.connection_id) + + # Remove from online user tracking + from app.router.v2.stats import remove_online_user + asyncio.create_task(remove_online_user(user_id)) + if state.pushable: - await asyncio.gather(*self.broadcast_tasks(int(state.connection_id), None)) + await asyncio.gather(*self.broadcast_tasks(user_id, None)) redis = get_redis() if await redis.exists(f"metadata:online:{state.connection_id}"): await redis.delete(f"metadata:online:{state.connection_id}") @@ -117,6 +123,10 @@ class MetadataHub(Hub[MetadataClientState]): user_id = int(client.connection_id) self.get_or_create_state(client) + # Track online user + from app.router.v2.stats import add_online_user + asyncio.create_task(add_online_user(user_id)) + async with with_db() as session: async with session.begin(): friends = ( diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 7a0fdd4..11520dd 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -163,6 +163,11 @@ class MultiplayerHub(Hub[MultiplayerClientState]): @override async def _clean_state(self, state: MultiplayerClientState): user_id = int(state.connection_id) + + # Remove from online user tracking + from app.router.v2.stats import remove_online_user + asyncio.create_task(remove_online_user(user_id)) + if state.room_id != 0 and state.room_id in self.rooms: server_room = self.rooms[state.room_id] room = server_room.room @@ -172,6 +177,14 @@ class MultiplayerHub(Hub[MultiplayerClientState]): self.get_client_by_id(str(user_id)), server_room, user ) + async def on_client_connect(self, client: Client) -> None: + """Track online users when connecting to multiplayer hub""" + logger.info(f"[MultiplayerHub] Client {client.user_id} connected") + + # Track online user + from app.router.v2.stats import add_online_user + asyncio.create_task(add_online_user(client.user_id)) + def _ensure_in_room(self, client: Client) -> ServerMultiplayerRoom: store = self.get_or_create_state(client) if store.room_id == 0: diff --git a/app/signalr/hub/spectator.py b/app/signalr/hub/spectator.py index dd228fd..71e2a92 100644 --- a/app/signalr/hub/spectator.py +++ b/app/signalr/hub/spectator.py @@ -169,12 +169,17 @@ class SpectatorHub(Hub[StoreClientState]): Enhanced cleanup based on official osu-server-spectator implementation. Properly notifies watched users when spectator disconnects. """ + user_id = int(state.connection_id) + + # Remove from online and playing tracking + from app.router.v2.stats import remove_online_user + asyncio.create_task(remove_online_user(user_id)) + if state.state: - await self._end_session(int(state.connection_id), state.state, state) + await self._end_session(user_id, state.state, state) # Critical fix: Notify all watched users that this spectator has disconnected # This matches the official CleanUpState implementation - user_id = int(state.connection_id) for watched_user_id in state.watched_user: if (target_client := self.get_client_by_id(str(watched_user_id))) is not None: await self.call_noblock( @@ -191,6 +196,10 @@ class SpectatorHub(Hub[StoreClientState]): """ logger.info(f"[SpectatorHub] Client {client.user_id} connected") + # Track online user + from app.router.v2.stats import add_online_user + asyncio.create_task(add_online_user(client.user_id)) + # Send all current player states to the new client # This matches the official OnConnectedAsync behavior active_states = [] @@ -295,6 +304,10 @@ class SpectatorHub(Hub[StoreClientState]): ) logger.info(f"[SpectatorHub] {client.user_id} began playing {state.beatmap_id}") + # Track playing user + from app.router.v2.stats import add_playing_user + asyncio.create_task(add_playing_user(user_id)) + # # 预缓存beatmap文件以加速后续PP计算 # await self._preload_beatmap_for_pp_calculation(state.beatmap_id) @@ -343,6 +356,11 @@ class SpectatorHub(Hub[StoreClientState]): ) and any(k.is_hit() and v > 0 for k, v in score.score_info.statistics.items()): await self._process_score(store, client) await self._end_session(user_id, state, store) + + # Remove from playing user tracking + from app.router.v2.stats import remove_playing_user + asyncio.create_task(remove_playing_user(user_id)) + store.state = None store.beatmap_status = None store.checksum = None diff --git a/main.py b/main.py index 2f449ea..4a4a417 100644 --- a/main.py +++ b/main.py @@ -33,6 +33,7 @@ from app.service.load_achievements import load_achievements from app.service.osu_rx_statistics import create_rx_statistics from app.service.recalculate import recalculate from app.service.redis_message_system import redis_message_system +from app.service.stats_scheduler import start_stats_scheduler, stop_stats_scheduler # 检查 New Relic 配置文件是否存在,如果存在则初始化 New Relic newrelic_config_path = os.path.join(os.path.dirname(__file__), "newrelic.ini") @@ -79,11 +80,13 @@ async def lifespan(app: FastAPI): await download_service.start_health_check() # 启动下载服务健康检查 await start_cache_scheduler() # 启动缓存调度器 redis_message_system.start() # 启动 Redis 消息系统 + start_stats_scheduler() # 启动统计调度器 load_achievements() # on shutdown yield stop_scheduler() redis_message_system.stop() # 停止 Redis 消息系统 + stop_stats_scheduler() # 停止统计调度器 await stop_cache_scheduler() # 停止缓存调度器 await download_service.stop_health_check() # 停止下载服务健康检查 await engine.dispose() diff --git a/test_stats_api.py b/test_stats_api.py new file mode 100644 index 0000000..d4bf531 --- /dev/null +++ b/test_stats_api.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +""" +服务器统计API测试脚本 +""" + +import asyncio +import json +from datetime import datetime + +import httpx + + +async def test_stats_api(): + """测试统计API""" + base_url = "http://localhost:8000" # 根据实际服务器地址修改 + + async with httpx.AsyncClient() as client: + print("🧪 测试服务器统计API...") + + # 测试服务器统计信息接口 + print("\n1. 测试 /api/v2/stats 端点...") + try: + response = await client.get(f"{base_url}/api/v2/stats") + if response.status_code == 200: + data = response.json() + print(f"✅ 成功获取服务器统计信息:") + print(f" - 注册用户: {data['registered_users']}") + print(f" - 在线用户: {data['online_users']}") + print(f" - 游玩用户: {data['playing_users']}") + print(f" - 更新时间: {data['timestamp']}") + else: + print(f"❌ 请求失败: HTTP {response.status_code}") + print(f" 响应: {response.text}") + except Exception as e: + print(f"❌ 请求异常: {e}") + + # 测试在线历史接口 + print("\n2. 测试 /api/v2/stats/history 端点...") + try: + response = await client.get(f"{base_url}/api/v2/stats/history") + if response.status_code == 200: + data = response.json() + print(f"✅ 成功获取在线历史信息:") + print(f" - 历史数据点数: {len(data['history'])}") + print(f" - 当前统计信息:") + current = data['current_stats'] + print(f" - 注册用户: {current['registered_users']}") + print(f" - 在线用户: {current['online_users']}") + print(f" - 游玩用户: {current['playing_users']}") + + if data['history']: + latest = data['history'][0] + print(f" - 最新历史记录:") + print(f" - 时间: {latest['timestamp']}") + print(f" - 在线数: {latest['online_count']}") + print(f" - 游玩数: {latest['playing_count']}") + else: + print(f" - 暂无历史数据(需要等待调度器记录)") + else: + print(f"❌ 请求失败: HTTP {response.status_code}") + print(f" 响应: {response.text}") + except Exception as e: + print(f"❌ 请求异常: {e}") + + +async def test_internal_functions(): + """测试内部函数""" + print("\n🔧 测试内部Redis函数...") + + try: + from app.router.v2.stats import ( + add_online_user, + remove_online_user, + add_playing_user, + remove_playing_user, + record_hourly_stats, + update_registered_users_count + ) + + # 测试添加用户 + print(" 测试添加在线用户...") + await add_online_user(999999) # 测试用户ID + + print(" 测试添加游玩用户...") + await add_playing_user(999999) + + print(" 测试记录统计数据...") + await record_hourly_stats() + + print(" 测试移除用户...") + await remove_playing_user(999999) + await remove_online_user(999999) + + print(" 测试更新注册用户数...") + await update_registered_users_count() + + print("✅ 内部函数测试完成") + + except Exception as e: + print(f"❌ 内部函数测试异常: {e}") + + +if __name__ == "__main__": + print("🚀 开始测试服务器统计功能...") + + # 首先测试内部函数 + asyncio.run(test_internal_functions()) + + # 然后测试API端点 + asyncio.run(test_stats_api()) + + print("\n✨ 测试完成!")