feat(redis): add subscriber for pub/sub mode
This commit is contained in:
@@ -38,3 +38,7 @@ async def create_tables():
|
|||||||
# Redis 依赖
|
# Redis 依赖
|
||||||
def get_redis():
|
def get_redis():
|
||||||
return redis_client
|
return redis_client
|
||||||
|
|
||||||
|
|
||||||
|
def get_redis_pubsub(channel: str | None = None):
|
||||||
|
return redis_client.pubsub(ignore_subscribe_messages=True, channel=channel)
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ from app.log import logger
|
|||||||
from app.models.metadata_hub import DailyChallengeInfo
|
from app.models.metadata_hub import DailyChallengeInfo
|
||||||
from app.models.mods import APIMod
|
from app.models.mods import APIMod
|
||||||
from app.models.room import RoomCategory
|
from app.models.room import RoomCategory
|
||||||
from app.signalr.hub import MetadataHubs
|
|
||||||
|
|
||||||
from .room import create_playlist_room
|
from .room import create_playlist_room
|
||||||
|
|
||||||
@@ -44,6 +43,8 @@ async def create_daily_challenge_room(
|
|||||||
|
|
||||||
@get_scheduler().scheduled_job("cron", hour=0, minute=0, second=0, id="daily_challenge")
|
@get_scheduler().scheduled_job("cron", hour=0, minute=0, second=0, id="daily_challenge")
|
||||||
async def daily_challenge_job():
|
async def daily_challenge_job():
|
||||||
|
from app.signalr.hub import MetadataHubs
|
||||||
|
|
||||||
today = datetime.now(UTC).date()
|
today = datetime.now(UTC).date()
|
||||||
redis = get_redis()
|
redis = get_redis()
|
||||||
key = f"daily_challenge:{today.year}-{today.month}-{today.day}"
|
key = f"daily_challenge:{today.year}-{today.month}-{today.day}"
|
||||||
|
|||||||
20
app/service/subscriber.py
Normal file
20
app/service/subscriber.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from app.dependencies.database import get_redis_pubsub
|
||||||
|
|
||||||
|
|
||||||
|
class RedisSubscriber:
|
||||||
|
def __init__(self, channel: str):
|
||||||
|
self.pubsub = get_redis_pubsub(channel)
|
||||||
|
self.handlers: dict[str, list[Callable[[str, str], Awaitable[Any]]]] = {}
|
||||||
|
|
||||||
|
async def listen(self):
|
||||||
|
async for message in self.pubsub.listen():
|
||||||
|
if message is not None and message["type"] == "message":
|
||||||
|
method = self.handlers.get(message["channel"])
|
||||||
|
if method:
|
||||||
|
for handler in method:
|
||||||
|
await handler(message["channel"], message["data"])
|
||||||
Reference in New Issue
Block a user