fix(user): don't invalidate user cache when user is connecting to spectator-server
resolve the sixth of #90
This commit is contained in:
@@ -90,6 +90,7 @@ class UserDict(TypedDict):
|
|||||||
pm_friends_only: bool
|
pm_friends_only: bool
|
||||||
profile_colour: str | None
|
profile_colour: str | None
|
||||||
username: str
|
username: str
|
||||||
|
is_online: bool
|
||||||
g0v0_playmode: GameMode
|
g0v0_playmode: GameMode
|
||||||
page: NotRequired[Page]
|
page: NotRequired[Page]
|
||||||
previous_usernames: NotRequired[list[str]]
|
previous_usernames: NotRequired[list[str]]
|
||||||
@@ -154,7 +155,6 @@ class UserDict(TypedDict):
|
|||||||
kudosu: NotRequired[Kudosu]
|
kudosu: NotRequired[Kudosu]
|
||||||
unread_pm_count: NotRequired[int]
|
unread_pm_count: NotRequired[int]
|
||||||
default_group: NotRequired[str]
|
default_group: NotRequired[str]
|
||||||
is_online: NotRequired[bool]
|
|
||||||
session_verified: NotRequired[bool]
|
session_verified: NotRequired[bool]
|
||||||
session_verification_method: NotRequired[Literal["totp", "mail"] | None]
|
session_verification_method: NotRequired[Literal["totp", "mail"] | None]
|
||||||
|
|
||||||
@@ -270,6 +270,7 @@ class UserModel(DatabaseModel[UserDict]):
|
|||||||
is_active: bool = True
|
is_active: bool = True
|
||||||
is_bot: bool = False
|
is_bot: bool = False
|
||||||
is_supporter: bool = False
|
is_supporter: bool = False
|
||||||
|
is_online: bool = False
|
||||||
last_visit: datetime | None = Field(default_factory=utcnow, sa_column=Column(DateTime(timezone=True)))
|
last_visit: datetime | None = Field(default_factory=utcnow, sa_column=Column(DateTime(timezone=True)))
|
||||||
pm_friends_only: bool = False
|
pm_friends_only: bool = False
|
||||||
profile_colour: str | None = None
|
profile_colour: str | None = None
|
||||||
@@ -661,14 +662,6 @@ class UserModel(DatabaseModel[UserDict]):
|
|||||||
async def default_group(_session: AsyncSession, obj: "User") -> str:
|
async def default_group(_session: AsyncSession, obj: "User") -> str:
|
||||||
return "default" if not obj.is_bot else "bot"
|
return "default" if not obj.is_bot else "bot"
|
||||||
|
|
||||||
@included
|
|
||||||
@staticmethod
|
|
||||||
async def is_online(_session: AsyncSession, obj: "User") -> bool:
|
|
||||||
from app.dependencies.database import get_redis
|
|
||||||
|
|
||||||
redis = get_redis()
|
|
||||||
return bool(await redis.exists(f"metadata:online:{obj.id}"))
|
|
||||||
|
|
||||||
@ondemand
|
@ondemand
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def session_verified(
|
async def session_verified(
|
||||||
|
|||||||
22
app/service/subscribers/user_cache.py
Normal file
22
app/service/subscribers/user_cache.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
from app.dependencies.database import get_redis
|
||||||
|
from app.log import logger
|
||||||
|
from app.service.user_cache_service import get_user_cache_service
|
||||||
|
|
||||||
|
from .base import RedisSubscriber
|
||||||
|
|
||||||
|
KEY = "user:online_status"
|
||||||
|
|
||||||
|
|
||||||
|
class UserOnlineSubscriber(RedisSubscriber):
|
||||||
|
async def start_subscribe(self):
|
||||||
|
await self.subscribe(KEY)
|
||||||
|
self.add_handler(KEY, self.on)
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
async def on(self, c: str, s: str): # noqa: ARG002
|
||||||
|
user_id = int(s)
|
||||||
|
logger.info(f"Received user online status update for user_id: {s}")
|
||||||
|
await get_user_cache_service(get_redis()).invalidate_user_cache(user_id)
|
||||||
|
|
||||||
|
|
||||||
|
user_online_subscriber = UserOnlineSubscriber()
|
||||||
@@ -244,7 +244,7 @@ class UserCacheService:
|
|||||||
"""使用户缓存失效"""
|
"""使用户缓存失效"""
|
||||||
try:
|
try:
|
||||||
# 删除用户信息缓存
|
# 删除用户信息缓存
|
||||||
pattern = f"user:{user_id}*"
|
pattern = f"user:{user_id}:ruleset:*"
|
||||||
keys = await self.redis.keys(pattern)
|
keys = await self.redis.keys(pattern)
|
||||||
if keys:
|
if keys:
|
||||||
await self.redis.delete(*keys)
|
await self.redis.delete(*keys)
|
||||||
@@ -252,6 +252,18 @@ class UserCacheService:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error invalidating user cache: {e}")
|
logger.error(f"Error invalidating user cache: {e}")
|
||||||
|
|
||||||
|
async def invalidate_user_all_cache(self, user_id: int):
|
||||||
|
"""使用户所有缓存失效"""
|
||||||
|
try:
|
||||||
|
# 删除用户信息缓存
|
||||||
|
pattern = f"user:{user_id}*"
|
||||||
|
keys = await self.redis.keys(pattern)
|
||||||
|
if keys:
|
||||||
|
await self.redis.delete(*keys)
|
||||||
|
logger.info(f"Invalidated {len(keys)} all cache entries for user {user_id}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error invalidating user all cache: {e}")
|
||||||
|
|
||||||
async def invalidate_user_scores_cache(self, user_id: int, mode: GameMode | None = None):
|
async def invalidate_user_scores_cache(self, user_id: int, mode: GameMode | None = None):
|
||||||
"""使用户成绩缓存失效"""
|
"""使用户成绩缓存失效"""
|
||||||
try:
|
try:
|
||||||
|
|||||||
2
main.py
2
main.py
@@ -35,6 +35,7 @@ from app.service.beatmap_download_service import download_service
|
|||||||
from app.service.beatmapset_update_service import init_beatmapset_update_service
|
from app.service.beatmapset_update_service import init_beatmapset_update_service
|
||||||
from app.service.email_queue import start_email_processor, stop_email_processor
|
from app.service.email_queue import start_email_processor, stop_email_processor
|
||||||
from app.service.redis_message_system import redis_message_system
|
from app.service.redis_message_system import redis_message_system
|
||||||
|
from app.service.subscribers.user_cache import user_online_subscriber
|
||||||
from app.tasks import (
|
from app.tasks import (
|
||||||
calculate_user_rank,
|
calculate_user_rank,
|
||||||
create_banchobot,
|
create_banchobot,
|
||||||
@@ -90,6 +91,7 @@ async def lifespan(app: FastAPI): # noqa: ARG001
|
|||||||
init_beatmapset_update_service(fetcher) # 初始化谱面集更新服务
|
init_beatmapset_update_service(fetcher) # 初始化谱面集更新服务
|
||||||
redis_message_system.start()
|
redis_message_system.start()
|
||||||
start_scheduler()
|
start_scheduler()
|
||||||
|
await user_online_subscriber.start_subscribe()
|
||||||
|
|
||||||
# show the status of AssetProxy
|
# show the status of AssetProxy
|
||||||
if settings.enable_asset_proxy:
|
if settings.enable_asset_proxy:
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
"""user: store online status in database
|
||||||
|
|
||||||
|
Revision ID: d430db6fc051
|
||||||
|
Revises: 57641cb601f4
|
||||||
|
Create Date: 2025-12-06 12:57:44.247351
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections.abc import Sequence
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "d430db6fc051"
|
||||||
|
down_revision: str | Sequence[str] | None = "57641cb601f4"
|
||||||
|
branch_labels: str | Sequence[str] | None = None
|
||||||
|
depends_on: str | Sequence[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
"""Upgrade schema."""
|
||||||
|
op.add_column(
|
||||||
|
"lazer_users",
|
||||||
|
sa.Column("is_online", sa.Boolean(), nullable=False, server_default=sa.text("0")),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
"""Downgrade schema."""
|
||||||
|
op.drop_column("lazer_users", "is_online")
|
||||||
Reference in New Issue
Block a user