From 5fe3f36055fdadb03a4e924e52fa48b152c050cf Mon Sep 17 00:00:00 2001 From: MingxuanGame Date: Sat, 9 Aug 2025 14:34:46 +0000 Subject: [PATCH] feat(daily-challenge): complete daily-challenge --- app/database/score.py | 1 + app/dependencies/database.py | 4 +- app/models/metadata_hub.py | 32 ++++++- app/router/score.py | 6 +- app/service/subscriber.py | 20 ---- app/service/subscribers/base.py | 48 ++++++++++ app/service/subscribers/score_processed.py | 87 +++++++++++++++++ app/signalr/hub/metadata.py | 105 +++++++++++++++++++++ app/signalr/hub/multiplayer.py | 1 - 9 files changed, 279 insertions(+), 25 deletions(-) delete mode 100644 app/service/subscriber.py create mode 100644 app/service/subscribers/base.py create mode 100644 app/service/subscribers/score_processed.py diff --git a/app/database/score.py b/app/database/score.py index 37b96a3..289660e 100644 --- a/app/database/score.py +++ b/app/database/score.py @@ -696,4 +696,5 @@ async def process_score( await session.refresh(score) await session.refresh(score_token) await session.refresh(user) + await redis.publish("score:processed", score.id) return score diff --git a/app/dependencies/database.py b/app/dependencies/database.py index a6b6e5c..e74af93 100644 --- a/app/dependencies/database.py +++ b/app/dependencies/database.py @@ -40,5 +40,5 @@ def get_redis(): return redis_client -def get_redis_pubsub(channel: str | None = None): - return redis_client.pubsub(ignore_subscribe_messages=True, channel=channel) +def get_redis_pubsub(): + return redis_client.pubsub() diff --git a/app/models/metadata_hub.py b/app/models/metadata_hub.py index 7ef2b7a..8bf237d 100644 --- a/app/models/metadata_hub.py +++ b/app/models/metadata_hub.py @@ -5,7 +5,9 @@ from typing import ClassVar, Literal from app.models.signalr import SignalRUnionMessage, UserState -from pydantic import BaseModel +from pydantic import BaseModel, Field + +TOTAL_SCORE_DISTRIBUTION_BINS = 13 class _UserActivity(SignalRUnionMessage): ... @@ -96,6 +98,7 @@ UserActivity = ( | ModdingBeatmap | TestingBeatmap | InDailyChallengeLobby + | PlayingDailyChallenge ) @@ -127,3 +130,30 @@ class OnlineStatus(IntEnum): class DailyChallengeInfo(BaseModel): room_id: int + + +class MultiplayerPlaylistItemStats(BaseModel): + playlist_item_id: int = 0 + total_score_distribution: list[int] = Field( + default_factory=list, + min_length=TOTAL_SCORE_DISTRIBUTION_BINS, + max_length=TOTAL_SCORE_DISTRIBUTION_BINS, + ) + cumulative_score: int = 0 + last_processed_score_id: int = 0 + + +class MultiplayerRoomStats(BaseModel): + room_id: int + playlist_item_stats: dict[int, MultiplayerPlaylistItemStats] = Field( + default_factory=dict + ) + + +class MultiplayerRoomScoreSetEvent(BaseModel): + room_id: int + playlist_item_id: int + score_id: int + user_id: int + total_score: int + new_rank: int | None = None diff --git a/app/router/score.py b/app/router/score.py index 506ebac..d826fd0 100644 --- a/app/router/score.py +++ b/app/router/score.py @@ -464,9 +464,13 @@ async def show_playlist_score( session: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), ): + room = await session.get(Room, room_id) + if not room: + raise HTTPException(status_code=404, detail="Room not found") + start_time = time.time() score_record = None - completed = False + completed = room.category != RoomCategory.REALTIME while time.time() - start_time < READ_SCORE_TIMEOUT: if score_record is None: score_record = ( diff --git a/app/service/subscriber.py b/app/service/subscriber.py deleted file mode 100644 index 39478ab..0000000 --- a/app/service/subscriber.py +++ /dev/null @@ -1,20 +0,0 @@ -from __future__ import annotations - -from collections.abc import Awaitable, Callable -from typing import Any - -from app.dependencies.database import get_redis_pubsub - - -class RedisSubscriber: - def __init__(self, channel: str): - self.pubsub = get_redis_pubsub(channel) - self.handlers: dict[str, list[Callable[[str, str], Awaitable[Any]]]] = {} - - async def listen(self): - async for message in self.pubsub.listen(): - if message is not None and message["type"] == "message": - method = self.handlers.get(message["channel"]) - if method: - for handler in method: - await handler(message["channel"], message["data"]) diff --git a/app/service/subscribers/base.py b/app/service/subscribers/base.py new file mode 100644 index 0000000..144dfd0 --- /dev/null +++ b/app/service/subscribers/base.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from typing import Any + +from app.dependencies.database import get_redis_pubsub + + +class RedisSubscriber: + def __init__(self): + self.pubsub = get_redis_pubsub() + self.handlers: dict[str, list[Callable[[str, str], Awaitable[Any]]]] = {} + self.task: asyncio.Task | None = None + + async def subscribe(self, channel: str): + await self.pubsub.subscribe(channel) + if channel not in self.handlers: + self.handlers[channel] = [] + + async def unsubscribe(self, channel: str): + if channel in self.handlers: + del self.handlers[channel] + await self.pubsub.unsubscribe(channel) + + async def listen(self): + while True: + message = await self.pubsub.get_message( + ignore_subscribe_messages=True, timeout=None + ) + if message is not None and message["type"] == "message": + method = self.handlers.get(message["channel"]) + if method: + await asyncio.gather( + *[ + handler(message["channel"], message["data"]) + for handler in method + ] + ) + + def start(self): + if self.task is None or self.task.done(): + self.task = asyncio.create_task(self.listen()) + + def stop(self): + if self.task is not None and not self.task.done(): + self.task.cancel() + self.task = None diff --git a/app/service/subscribers/score_processed.py b/app/service/subscribers/score_processed.py new file mode 100644 index 0000000..b1bc5bd --- /dev/null +++ b/app/service/subscribers/score_processed.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from app.database import PlaylistBestScore, Score +from app.database.playlist_best_score import get_position +from app.dependencies.database import engine +from app.models.metadata_hub import MultiplayerRoomScoreSetEvent + +from .base import RedisSubscriber + +from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession + +if TYPE_CHECKING: + from app.signalr.hub import MetadataHub + + +CHANNEL = "score:processed" + + +class ScoreSubscriber(RedisSubscriber): + def __init__(self): + super().__init__() + self.room_subscriber: dict[int, list[int]] = {} + self.metadata_hub: "MetadataHub | None " = None + self.subscribed = False + self.handlers[CHANNEL] = [self._handler] + + async def subscribe_room_score(self, room_id: int, user_id: int): + if room_id not in self.room_subscriber: + await self.subscribe(CHANNEL) + self.start() + self.room_subscriber.setdefault(room_id, []).append(user_id) + + async def unsubscribe_room_score(self, room_id: int, user_id: int): + if room_id in self.room_subscriber: + self.room_subscriber[room_id].remove(user_id) + if not self.room_subscriber[room_id]: + del self.room_subscriber[room_id] + + async def _notify_room_score_processed(self, score_id: int): + if not self.metadata_hub: + return + async with AsyncSession(engine) as session: + score = await session.get(Score, score_id) + if ( + not score + or not score.passed + or score.room_id is None + or score.playlist_item_id is None + ): + return + if not self.room_subscriber.get(score.room_id, []): + return + + new_rank = None + user_best = ( + await session.exec( + select(PlaylistBestScore).where( + PlaylistBestScore.user_id == score.user_id, + PlaylistBestScore.room_id == score.room_id, + ) + ) + ).first() + if user_best and user_best.score_id == score_id: + new_rank = await get_position( + user_best.room_id, + user_best.playlist_id, + user_best.score_id, + session, + ) + + event = MultiplayerRoomScoreSetEvent( + room_id=score.room_id, + playlist_item_id=score.playlist_item_id, + score_id=score_id, + user_id=score.user_id, + total_score=score.total_score, + new_rank=new_rank, + ) + await self.metadata_hub.notify_room_score_processed(event) + + async def _handler(self, channel: str, data: str): + score_id = int(data) + if self.metadata_hub: + await self._notify_room_score_processed(score_id) diff --git a/app/signalr/hub/metadata.py b/app/signalr/hub/metadata.py index ef21c93..f81aefa 100644 --- a/app/signalr/hub/metadata.py +++ b/app/signalr/hub/metadata.py @@ -1,20 +1,30 @@ from __future__ import annotations import asyncio +from collections import defaultdict from collections.abc import Coroutine from datetime import UTC, datetime +import math from typing import override +from app.calculator import clamp from app.database import Relationship, RelationshipType, User +from app.database.playlist_best_score import PlaylistBestScore +from app.database.playlists import Playlist from app.database.room import Room from app.dependencies.database import engine, get_redis from app.models.metadata_hub import ( + TOTAL_SCORE_DISTRIBUTION_BINS, DailyChallengeInfo, MetadataClientState, + MultiplayerPlaylistItemStats, + MultiplayerRoomScoreSetEvent, + MultiplayerRoomStats, OnlineStatus, UserActivity, ) from app.models.room import RoomCategory +from app.service.subscribers.score_processed import ScoreSubscriber from .hub import Client, Hub @@ -27,11 +37,33 @@ ONLINE_PRESENCE_WATCHERS_GROUP = "metadata:online-presence-watchers" class MetadataHub(Hub[MetadataClientState]): def __init__(self) -> None: super().__init__() + self.subscriber = ScoreSubscriber() + self.subscriber.metadata_hub = self + self._daily_challenge_stats: MultiplayerRoomStats | None = None + self._today = datetime.now(UTC).date() + self._lock = asyncio.Lock() + + def get_daily_challenge_stats( + self, daily_challenge_room: int + ) -> MultiplayerRoomStats: + if ( + self._daily_challenge_stats is None + or self._today != datetime.now(UTC).date() + ): + self._daily_challenge_stats = MultiplayerRoomStats( + room_id=daily_challenge_room, + playlist_item_stats={}, + ) + return self._daily_challenge_stats @staticmethod def online_presence_watchers_group() -> str: return ONLINE_PRESENCE_WATCHERS_GROUP + @staticmethod + def room_watcher_group(room_id: int) -> str: + return f"metadata:multiplayer-room-watchers:{room_id}" + def broadcast_tasks( self, user_id: int, store: MetadataClientState | None ) -> set[Coroutine]: @@ -186,3 +218,76 @@ class MetadataHub(Hub[MetadataClientState]): async def EndWatchingUserPresence(self, client: Client) -> None: self.remove_from_group(client, self.online_presence_watchers_group()) + + async def notify_room_score_processed(self, event: MultiplayerRoomScoreSetEvent): + await self.broadcast_group_call( + self.room_watcher_group(event.room_id), "MultiplayerRoomScoreSet", event + ) + + async def BeginWatchingMultiplayerRoom(self, client: Client, room_id: int): + self.add_to_group(client, self.room_watcher_group(room_id)) + await self.subscriber.subscribe_room_score(room_id, client.user_id) + stats = self.get_daily_challenge_stats(room_id) + await self.update_daily_challenge_stats(stats) + return list(stats.playlist_item_stats.values()) + + async def update_daily_challenge_stats(self, stats: MultiplayerRoomStats) -> None: + async with AsyncSession(engine) as session: + playlist_ids = ( + await session.exec( + select(Playlist.id).where( + Playlist.room_id == stats.room_id, + ) + ) + ).all() + for playlist_id in playlist_ids: + item = stats.playlist_item_stats.get(playlist_id, None) + if item is None: + item = MultiplayerPlaylistItemStats( + playlist_item_id=playlist_id, + total_score_distribution=[0] * TOTAL_SCORE_DISTRIBUTION_BINS, + cumulative_score=0, + last_processed_score_id=0, + ) + stats.playlist_item_stats[playlist_id] = item + last_processed_score_id = item.last_processed_score_id + scores = ( + await session.exec( + select(PlaylistBestScore).where( + PlaylistBestScore.room_id == stats.room_id, + PlaylistBestScore.playlist_id == playlist_id, + PlaylistBestScore.score_id > last_processed_score_id, + ) + ) + ).all() + if len(scores) == 0: + continue + + async with self._lock: + if item.last_processed_score_id == last_processed_score_id: + totals = defaultdict(int) + for score in scores: + bin_index = int( + clamp( + math.floor(score.total_score / 100000), + 0, + TOTAL_SCORE_DISTRIBUTION_BINS - 1, + ) + ) + totals[bin_index] += 1 + + item.cumulative_score += sum( + score.total_score for score in scores + ) + + for j in range(TOTAL_SCORE_DISTRIBUTION_BINS): + item.total_score_distribution[j] += totals.get(j, 0) + + if scores: + item.last_processed_score_id = max( + score.score_id for score in scores + ) + + async def EndWatchingMultiplayerRoom(self, client: Client, room_id: int): + self.remove_from_group(client, self.room_watcher_group(room_id)) + await self.subscriber.unsubscribe_room_score(room_id, client.user_id) diff --git a/app/signalr/hub/multiplayer.py b/app/signalr/hub/multiplayer.py index 9d6ad8e..e397031 100644 --- a/app/signalr/hub/multiplayer.py +++ b/app/signalr/hub/multiplayer.py @@ -1082,7 +1082,6 @@ class MultiplayerHub(Hub[MultiplayerClientState]): ) async def ChangeSettings(self, client: Client, settings: MultiplayerRoomSettings): - print(settings) store = self.get_or_create_state(client) if store.room_id == 0: raise InvokeException("You are not in a room")