Files
g0v0-server/app/service/subscribers/chat.py
MingxuanGame d23f32f08d refactor(log): refactor the whole project
format: {time:YYYY-MM-DD HH:mm:ss} [{level}] | {name} | {message}
{name} is:
- Uvicorn: log from uvicorn server (#228B22)
- Service: log from class of `app.service` (blue)
- Fetcher: log from fetchers (magenta)
- Task: log from `app.tasks` (#FFD700)
- System: log from `system_logger` (red)
- Normal: log from `log(name)` (#FFC1C1)
- Default: the module name of caller

if you are writing services or tasks, you can just call `logger.`, we will pack it with name `Service` or `Task`
if you want to print fetcher logs, system-related logs, or normal logs, use `logger = (fetcher_logger / system_logger / log)(name)`
2025-10-03 11:53:05 +00:00

62 lines
1.9 KiB
Python

from __future__ import annotations
from typing import TYPE_CHECKING
from app.log import log
from app.models.notification import NotificationDetails
from .base import RedisSubscriber
from pydantic import TypeAdapter
if TYPE_CHECKING:
from app.router.notification.server import ChatServer
JOIN_CHANNEL = "chat:room:joined"
EXIT_CHANNEL = "chat:room:left"
ON_NOTIFICATION = "chat:notification"
logger = log("Chat")
class ChatSubscriber(RedisSubscriber):
def __init__(self):
super().__init__()
self.room_subscriber: dict[int, list[int]] = {}
self.chat_server: "ChatServer | None" = None
async def start_subscribe(self):
await self.subscribe(JOIN_CHANNEL)
self.add_handler(JOIN_CHANNEL, self.on_join_room)
await self.subscribe(EXIT_CHANNEL)
self.add_handler(EXIT_CHANNEL, self.on_leave_room)
await self.subscribe(ON_NOTIFICATION)
self.add_handler(ON_NOTIFICATION, self.on_notification)
self.start()
async def on_join_room(self, c: str, s: str):
channel_id, user_id = s.split(":")
if self.chat_server is None:
return
await self.chat_server.join_room_channel(int(channel_id), int(user_id))
async def on_leave_room(self, c: str, s: str):
channel_id, user_id = s.split(":")
if self.chat_server is None:
return
await self.chat_server.leave_room_channel(int(channel_id), int(user_id))
async def on_notification(self, c: str, s: str):
try:
detail = TypeAdapter(NotificationDetails).validate_json(s)
except ValueError:
logger.exception("Failed to parse notification detail")
return
except Exception:
logger.exception("Failed to parse notification detail")
return
if self.chat_server is None:
return
await self.chat_server.new_private_notification(detail)