feat(signalr): support json & msgpack protocol for all hubs
This commit is contained in:
@@ -2,14 +2,12 @@ from __future__ import annotations
|
||||
|
||||
from abc import abstractmethod
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
import inspect
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from app.config import settings
|
||||
from app.log import logger
|
||||
from app.models.signalr import UserState, _by_index
|
||||
from app.models.signalr import UserState
|
||||
from app.signalr.exception import InvokeException
|
||||
from app.signalr.packet import (
|
||||
ClosePacket,
|
||||
@@ -23,7 +21,6 @@ from app.signalr.store import ResultStore
|
||||
from app.signalr.utils import get_signature
|
||||
|
||||
from fastapi import WebSocket
|
||||
from pydantic import BaseModel
|
||||
from starlette.websockets import WebSocketDisconnect
|
||||
|
||||
|
||||
@@ -51,7 +48,7 @@ class Client:
|
||||
self.connection_id = connection_id
|
||||
self.connection_token = connection_token
|
||||
self.connection = connection
|
||||
self.procotol = protocol
|
||||
self.protocol = protocol
|
||||
self._listen_task: asyncio.Task | None = None
|
||||
self._ping_task: asyncio.Task | None = None
|
||||
self._store = ResultStore()
|
||||
@@ -64,14 +61,14 @@ class Client:
|
||||
return int(self.connection_id)
|
||||
|
||||
async def send_packet(self, packet: Packet):
|
||||
await self.connection.send_bytes(self.procotol.encode(packet))
|
||||
await self.connection.send_bytes(self.protocol.encode(packet))
|
||||
|
||||
async def receive_packets(self) -> list[Packet]:
|
||||
message = await self.connection.receive()
|
||||
d = message.get("bytes") or message.get("text", "").encode()
|
||||
if not d:
|
||||
return []
|
||||
return self.procotol.decode(d)
|
||||
return self.protocol.decode(d)
|
||||
|
||||
async def _ping(self):
|
||||
while True:
|
||||
@@ -265,14 +262,9 @@ class Hub[TState: UserState]:
|
||||
for name, param in signature.parameters.items():
|
||||
if name == "self" or param.annotation is Client:
|
||||
continue
|
||||
if issubclass(param.annotation, BaseModel):
|
||||
call_params.append(param.annotation.model_validate(args.pop(0)))
|
||||
elif inspect.isclass(param.annotation) and issubclass(
|
||||
param.annotation, Enum
|
||||
):
|
||||
call_params.append(_by_index(args.pop(0), param.annotation))
|
||||
else:
|
||||
call_params.append(args.pop(0))
|
||||
call_params.append(
|
||||
client.protocol.validate_object(args.pop(0), param.annotation)
|
||||
)
|
||||
return await method_(client, *call_params)
|
||||
|
||||
async def call(self, client: Client, method: str, *args: Any) -> Any:
|
||||
|
||||
@@ -12,7 +12,6 @@ from app.models.metadata_hub import MetadataClientState, OnlineStatus, UserActiv
|
||||
|
||||
from .hub import Client, Hub
|
||||
|
||||
from pydantic import TypeAdapter
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
@@ -32,7 +31,7 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
) -> set[Coroutine]:
|
||||
if store is not None and not store.pushable:
|
||||
return set()
|
||||
data = store.to_dict() if store else None
|
||||
data = store.for_push if store else None
|
||||
return {
|
||||
self.broadcast_group_call(
|
||||
self.online_presence_watchers_group(),
|
||||
@@ -103,7 +102,7 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
self.friend_presence_watchers_group(friend_id),
|
||||
"FriendPresenceUpdated",
|
||||
friend_id,
|
||||
friend_state.to_dict(),
|
||||
friend_state if friend_state.pushable else None,
|
||||
)
|
||||
)
|
||||
await asyncio.gather(*tasks)
|
||||
@@ -123,27 +122,24 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
client,
|
||||
"UserPresenceUpdated",
|
||||
user_id,
|
||||
store.to_dict(),
|
||||
store.for_push,
|
||||
)
|
||||
)
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def UpdateActivity(self, client: Client, activity_dict: dict | None) -> None:
|
||||
async def UpdateActivity(
|
||||
self, client: Client, activity: UserActivity | None
|
||||
) -> None:
|
||||
user_id = int(client.connection_id)
|
||||
activity = (
|
||||
TypeAdapter(UserActivity).validate_python(activity_dict)
|
||||
if activity_dict
|
||||
else None
|
||||
)
|
||||
store = self.get_or_create_state(client)
|
||||
store.user_activity = activity
|
||||
store.activity = activity
|
||||
tasks = self.broadcast_tasks(user_id, store)
|
||||
tasks.add(
|
||||
self.call_noblock(
|
||||
client,
|
||||
"UserPresenceUpdated",
|
||||
user_id,
|
||||
store.to_dict(),
|
||||
store.for_push,
|
||||
)
|
||||
)
|
||||
await asyncio.gather(*tasks)
|
||||
@@ -155,7 +151,7 @@ class MetadataHub(Hub[MetadataClientState]):
|
||||
client,
|
||||
"UserPresenceUpdated",
|
||||
user_id,
|
||||
store.to_dict(),
|
||||
store,
|
||||
)
|
||||
for user_id, store in self.state.items()
|
||||
if store.pushable
|
||||
|
||||
@@ -13,8 +13,7 @@ from app.database.score_token import ScoreToken
|
||||
from app.dependencies.database import engine
|
||||
from app.models.beatmap import BeatmapRankStatus
|
||||
from app.models.mods import mods_to_int
|
||||
from app.models.score import LegacyReplaySoloScoreInfo, ScoreStatisticsInt
|
||||
from app.models.signalr import serialize_to_list
|
||||
from app.models.score import LegacyReplaySoloScoreInfo, ScoreStatistics
|
||||
from app.models.spectator_hub import (
|
||||
APIUser,
|
||||
FrameDataBundle,
|
||||
@@ -69,8 +68,8 @@ def save_replay(
|
||||
md5: str,
|
||||
username: str,
|
||||
score: Score,
|
||||
statistics: ScoreStatisticsInt,
|
||||
maximum_statistics: ScoreStatisticsInt,
|
||||
statistics: ScoreStatistics,
|
||||
maximum_statistics: ScoreStatistics,
|
||||
frames: list[LegacyReplayFrame],
|
||||
) -> None:
|
||||
data = bytearray()
|
||||
@@ -107,8 +106,8 @@ def save_replay(
|
||||
last_time = 0
|
||||
for frame in frames:
|
||||
frame_strs.append(
|
||||
f"{frame.time - last_time}|{frame.x or 0.0}"
|
||||
f"|{frame.y or 0.0}|{frame.button_state}"
|
||||
f"{frame.time - last_time}|{frame.mouse_x or 0.0}"
|
||||
f"|{frame.mouse_y or 0.0}|{frame.button_state}"
|
||||
)
|
||||
last_time = frame.time
|
||||
frame_strs.append("-12345|0|0|0")
|
||||
@@ -165,9 +164,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
|
||||
async def on_client_connect(self, client: Client) -> None:
|
||||
tasks = [
|
||||
self.call_noblock(
|
||||
client, "UserBeganPlaying", user_id, serialize_to_list(store.state)
|
||||
)
|
||||
self.call_noblock(client, "UserBeganPlaying", user_id, store.state)
|
||||
for user_id, store in self.state.items()
|
||||
if store.state is not None
|
||||
]
|
||||
@@ -214,7 +211,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
self.group_id(user_id),
|
||||
"UserBeganPlaying",
|
||||
user_id,
|
||||
serialize_to_list(state),
|
||||
state,
|
||||
)
|
||||
|
||||
async def SendFrameData(self, client: Client, frame_data: FrameDataBundle) -> None:
|
||||
@@ -222,7 +219,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
state = self.get_or_create_state(client)
|
||||
if not state.score:
|
||||
return
|
||||
state.score.score_info.acc = frame_data.header.acc
|
||||
state.score.score_info.accuracy = frame_data.header.accuracy
|
||||
state.score.score_info.combo = frame_data.header.combo
|
||||
state.score.score_info.max_combo = frame_data.header.max_combo
|
||||
state.score.score_info.statistics = frame_data.header.statistics
|
||||
@@ -233,7 +230,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
self.group_id(user_id),
|
||||
"UserSentFrames",
|
||||
user_id,
|
||||
frame_data.model_dump(),
|
||||
frame_data,
|
||||
)
|
||||
|
||||
async def EndPlaySession(self, client: Client, state: SpectatorState) -> None:
|
||||
@@ -316,7 +313,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
self.group_id(user_id),
|
||||
"UserFinishedPlaying",
|
||||
user_id,
|
||||
serialize_to_list(state) if state else None,
|
||||
state,
|
||||
)
|
||||
|
||||
async def StartWatchingUser(self, client: Client, target_id: int) -> None:
|
||||
@@ -327,7 +324,7 @@ class SpectatorHub(Hub[StoreClientState]):
|
||||
client,
|
||||
"UserBeganPlaying",
|
||||
target_id,
|
||||
serialize_to_list(target_store.state),
|
||||
target_store.state,
|
||||
)
|
||||
store = self.get_or_create_state(client)
|
||||
store.watched_user.add(target_id)
|
||||
|
||||
Reference in New Issue
Block a user