feat(daily-challenge): complete daily-challenge
This commit is contained in:
48
app/service/subscribers/base.py
Normal file
48
app/service/subscribers/base.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any
|
||||
|
||||
from app.dependencies.database import get_redis_pubsub
|
||||
|
||||
|
||||
class RedisSubscriber:
|
||||
def __init__(self):
|
||||
self.pubsub = get_redis_pubsub()
|
||||
self.handlers: dict[str, list[Callable[[str, str], Awaitable[Any]]]] = {}
|
||||
self.task: asyncio.Task | None = None
|
||||
|
||||
async def subscribe(self, channel: str):
|
||||
await self.pubsub.subscribe(channel)
|
||||
if channel not in self.handlers:
|
||||
self.handlers[channel] = []
|
||||
|
||||
async def unsubscribe(self, channel: str):
|
||||
if channel in self.handlers:
|
||||
del self.handlers[channel]
|
||||
await self.pubsub.unsubscribe(channel)
|
||||
|
||||
async def listen(self):
|
||||
while True:
|
||||
message = await self.pubsub.get_message(
|
||||
ignore_subscribe_messages=True, timeout=None
|
||||
)
|
||||
if message is not None and message["type"] == "message":
|
||||
method = self.handlers.get(message["channel"])
|
||||
if method:
|
||||
await asyncio.gather(
|
||||
*[
|
||||
handler(message["channel"], message["data"])
|
||||
for handler in method
|
||||
]
|
||||
)
|
||||
|
||||
def start(self):
|
||||
if self.task is None or self.task.done():
|
||||
self.task = asyncio.create_task(self.listen())
|
||||
|
||||
def stop(self):
|
||||
if self.task is not None and not self.task.done():
|
||||
self.task.cancel()
|
||||
self.task = None
|
||||
Reference in New Issue
Block a user