refactor(log): add prefix for fetcher and services
This commit is contained in:
@@ -1,10 +1,12 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from app.database.beatmap import BeatmapResp
|
from app.database.beatmap import BeatmapResp
|
||||||
from app.log import logger
|
from app.log import fetcher_logger
|
||||||
|
|
||||||
from ._base import BaseFetcher
|
from ._base import BaseFetcher
|
||||||
|
|
||||||
|
logger = fetcher_logger("BeatmapFetcher")
|
||||||
|
|
||||||
|
|
||||||
class BeatmapFetcher(BaseFetcher):
|
class BeatmapFetcher(BaseFetcher):
|
||||||
async def get_beatmap(self, beatmap_id: int | None = None, beatmap_checksum: str | None = None) -> BeatmapResp:
|
async def get_beatmap(self, beatmap_id: int | None = None, beatmap_checksum: str | None = None) -> BeatmapResp:
|
||||||
@@ -14,7 +16,7 @@ class BeatmapFetcher(BaseFetcher):
|
|||||||
params = {"checksum": beatmap_checksum}
|
params = {"checksum": beatmap_checksum}
|
||||||
else:
|
else:
|
||||||
raise ValueError("Either beatmap_id or beatmap_checksum must be provided.")
|
raise ValueError("Either beatmap_id or beatmap_checksum must be provided.")
|
||||||
logger.opt(colors=True).debug(f"<blue>[BeatmapFetcher]</blue> get_beatmap: <y>{params}</y>")
|
logger.opt(colors=True).debug(f"get_beatmap: <y>{params}</y>")
|
||||||
|
|
||||||
return BeatmapResp.model_validate(
|
return BeatmapResp.model_validate(
|
||||||
await self.request_api(
|
await self.request_api(
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.log import fetcher_logger
|
||||||
|
|
||||||
from ._base import BaseFetcher
|
from ._base import BaseFetcher
|
||||||
|
|
||||||
from httpx import AsyncClient, HTTPError
|
from httpx import AsyncClient, HTTPError
|
||||||
from httpx._models import Response
|
from httpx._models import Response
|
||||||
from loguru import logger
|
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
urls = [
|
urls = [
|
||||||
@@ -13,12 +14,14 @@ urls = [
|
|||||||
"https://catboy.best/osu/{beatmap_id}",
|
"https://catboy.best/osu/{beatmap_id}",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
logger = fetcher_logger("BeatmapRawFetcher")
|
||||||
|
|
||||||
|
|
||||||
class BeatmapRawFetcher(BaseFetcher):
|
class BeatmapRawFetcher(BaseFetcher):
|
||||||
async def get_beatmap_raw(self, beatmap_id: int) -> str:
|
async def get_beatmap_raw(self, beatmap_id: int) -> str:
|
||||||
for url in urls:
|
for url in urls:
|
||||||
req_url = url.format(beatmap_id=beatmap_id)
|
req_url = url.format(beatmap_id=beatmap_id)
|
||||||
logger.opt(colors=True).debug(f"<blue>[BeatmapRawFetcher]</blue> get_beatmap_raw: <y>{req_url}</y>")
|
logger.opt(colors=True).debug(f"get_beatmap_raw: <y>{req_url}</y>")
|
||||||
resp = await self._request(req_url)
|
resp = await self._request(req_url)
|
||||||
if resp.status_code >= 400:
|
if resp.status_code >= 400:
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import json
|
|||||||
|
|
||||||
from app.database.beatmapset import BeatmapsetResp, SearchBeatmapsetsResp
|
from app.database.beatmapset import BeatmapsetResp, SearchBeatmapsetsResp
|
||||||
from app.helpers.rate_limiter import osu_api_rate_limiter
|
from app.helpers.rate_limiter import osu_api_rate_limiter
|
||||||
from app.log import logger
|
from app.log import fetcher_logger
|
||||||
from app.models.beatmap import SearchQueryModel
|
from app.models.beatmap import SearchQueryModel
|
||||||
from app.models.model import Cursor
|
from app.models.model import Cursor
|
||||||
from app.utils import bg_tasks
|
from app.utils import bg_tasks
|
||||||
@@ -24,6 +24,9 @@ class RateLimitError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
logger = fetcher_logger("BeatmapsetFetcher")
|
||||||
|
|
||||||
|
|
||||||
class BeatmapsetFetcher(BaseFetcher):
|
class BeatmapsetFetcher(BaseFetcher):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_homepage_queries() -> list[tuple[SearchQueryModel, Cursor]]:
|
def _get_homepage_queries() -> list[tuple[SearchQueryModel, Cursor]]:
|
||||||
@@ -135,7 +138,7 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
return {}
|
return {}
|
||||||
|
|
||||||
async def get_beatmapset(self, beatmap_set_id: int) -> BeatmapsetResp:
|
async def get_beatmapset(self, beatmap_set_id: int) -> BeatmapsetResp:
|
||||||
logger.opt(colors=True).debug(f"<blue>[BeatmapsetFetcher]</blue> get_beatmapset: <y>{beatmap_set_id}</y>")
|
logger.opt(colors=True).debug(f"get_beatmapset: <y>{beatmap_set_id}</y>")
|
||||||
|
|
||||||
return BeatmapsetResp.model_validate(
|
return BeatmapsetResp.model_validate(
|
||||||
await self.request_api(f"https://osu.ppy.sh/api/v2/beatmapsets/{beatmap_set_id}")
|
await self.request_api(f"https://osu.ppy.sh/api/v2/beatmapsets/{beatmap_set_id}")
|
||||||
@@ -144,7 +147,7 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
async def search_beatmapset(
|
async def search_beatmapset(
|
||||||
self, query: SearchQueryModel, cursor: Cursor, redis_client: redis.Redis
|
self, query: SearchQueryModel, cursor: Cursor, redis_client: redis.Redis
|
||||||
) -> SearchBeatmapsetsResp:
|
) -> SearchBeatmapsetsResp:
|
||||||
logger.opt(colors=True).debug(f"<blue>[BeatmapsetFetcher]</blue> search_beatmapset: <y>{query}</y>")
|
logger.opt(colors=True).debug(f"search_beatmapset: <y>{query}</y>")
|
||||||
|
|
||||||
# 生成缓存键
|
# 生成缓存键
|
||||||
cache_key = self._generate_cache_key(query, cursor)
|
cache_key = self._generate_cache_key(query, cursor)
|
||||||
@@ -152,17 +155,15 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
# 尝试从缓存获取结果
|
# 尝试从缓存获取结果
|
||||||
cached_result = await redis_client.get(cache_key)
|
cached_result = await redis_client.get(cache_key)
|
||||||
if cached_result:
|
if cached_result:
|
||||||
logger.opt(colors=True).debug(f"<green>[BeatmapsetFetcher]</green> Cache hit for key: <y>{cache_key}</y>")
|
logger.opt(colors=True).debug(f"Cache hit for key: <y>{cache_key}</y>")
|
||||||
try:
|
try:
|
||||||
cached_data = json.loads(cached_result)
|
cached_data = json.loads(cached_result)
|
||||||
return SearchBeatmapsetsResp.model_validate(cached_data)
|
return SearchBeatmapsetsResp.model_validate(cached_data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.opt(colors=True).warning(
|
logger.opt(colors=True).warning(f"Cache data invalid, fetching from API: {e}")
|
||||||
f"<yellow>[BeatmapsetFetcher]</yellow> Cache data invalid, fetching from API: {e}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# 缓存未命中,从 API 获取数据
|
# 缓存未命中,从 API 获取数据
|
||||||
logger.opt(colors=True).debug("<blue>[BeatmapsetFetcher]</blue> Cache miss, fetching from API")
|
logger.opt(colors=True).debug("Cache miss, fetching from API")
|
||||||
|
|
||||||
params = query.model_dump(exclude_none=True, exclude_unset=True, exclude_defaults=True)
|
params = query.model_dump(exclude_none=True, exclude_unset=True, exclude_defaults=True)
|
||||||
|
|
||||||
@@ -186,9 +187,7 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
cache_ttl = 15 * 60 # 15 分钟
|
cache_ttl = 15 * 60 # 15 分钟
|
||||||
await redis_client.set(cache_key, json.dumps(api_response, separators=(",", ":")), ex=cache_ttl)
|
await redis_client.set(cache_key, json.dumps(api_response, separators=(",", ":")), ex=cache_ttl)
|
||||||
|
|
||||||
logger.opt(colors=True).debug(
|
logger.opt(colors=True).debug(f"Cached result for key: <y>{cache_key}</y> (TTL: {cache_ttl}s)")
|
||||||
f"<green>[BeatmapsetFetcher]</green> Cached result for key: <y>{cache_key}</y> (TTL: {cache_ttl}s)"
|
|
||||||
)
|
|
||||||
|
|
||||||
resp = SearchBeatmapsetsResp.model_validate(api_response)
|
resp = SearchBeatmapsetsResp.model_validate(api_response)
|
||||||
|
|
||||||
@@ -204,9 +203,7 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
try:
|
try:
|
||||||
await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1)
|
await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=1)
|
||||||
except RateLimitError:
|
except RateLimitError:
|
||||||
logger.opt(colors=True).info(
|
logger.opt(colors=True).info("Prefetch skipped due to rate limit")
|
||||||
"<yellow>[BeatmapsetFetcher]</yellow> Prefetch skipped due to rate limit"
|
|
||||||
)
|
|
||||||
|
|
||||||
bg_tasks.add_task(delayed_prefetch)
|
bg_tasks.add_task(delayed_prefetch)
|
||||||
|
|
||||||
@@ -230,14 +227,14 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
# 使用当前 cursor 请求下一页
|
# 使用当前 cursor 请求下一页
|
||||||
next_query = query.model_copy()
|
next_query = query.model_copy()
|
||||||
|
|
||||||
logger.opt(colors=True).debug(f"<cyan>[BeatmapsetFetcher]</cyan> Prefetching page {page + 1}")
|
logger.opt(colors=True).debug(f"Prefetching page {page + 1}")
|
||||||
|
|
||||||
# 生成下一页的缓存键
|
# 生成下一页的缓存键
|
||||||
next_cache_key = self._generate_cache_key(next_query, cursor)
|
next_cache_key = self._generate_cache_key(next_query, cursor)
|
||||||
|
|
||||||
# 检查是否已经缓存
|
# 检查是否已经缓存
|
||||||
if await redis_client.exists(next_cache_key):
|
if await redis_client.exists(next_cache_key):
|
||||||
logger.opt(colors=True).debug(f"<cyan>[BeatmapsetFetcher]</cyan> Page {page + 1} already cached")
|
logger.opt(colors=True).debug(f"Page {page + 1} already cached")
|
||||||
# 尝试从缓存获取cursor继续预取
|
# 尝试从缓存获取cursor继续预取
|
||||||
cached_data = await redis_client.get(next_cache_key)
|
cached_data = await redis_client.get(next_cache_key)
|
||||||
if cached_data:
|
if cached_data:
|
||||||
@@ -282,22 +279,18 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
ex=prefetch_ttl,
|
ex=prefetch_ttl,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.opt(colors=True).debug(
|
logger.opt(colors=True).debug(f"Prefetched page {page + 1} (TTL: {prefetch_ttl}s)")
|
||||||
f"<cyan>[BeatmapsetFetcher]</cyan> Prefetched page {page + 1} (TTL: {prefetch_ttl}s)"
|
|
||||||
)
|
|
||||||
|
|
||||||
except RateLimitError:
|
except RateLimitError:
|
||||||
logger.opt(colors=True).info("<yellow>[BeatmapsetFetcher]</yellow> Prefetch stopped due to rate limit")
|
logger.opt(colors=True).info("Prefetch stopped due to rate limit")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.opt(colors=True).warning(f"<yellow>[BeatmapsetFetcher]</yellow> Prefetch failed: {e}")
|
logger.opt(colors=True).warning(f"Prefetch failed: {e}")
|
||||||
|
|
||||||
async def warmup_homepage_cache(self, redis_client: redis.Redis) -> None:
|
async def warmup_homepage_cache(self, redis_client: redis.Redis) -> None:
|
||||||
"""预热主页缓存"""
|
"""预热主页缓存"""
|
||||||
homepage_queries = self._get_homepage_queries()
|
homepage_queries = self._get_homepage_queries()
|
||||||
|
|
||||||
logger.opt(colors=True).info(
|
logger.opt(colors=True).info(f"Starting homepage cache warmup ({len(homepage_queries)} queries)")
|
||||||
f"<magenta>[BeatmapsetFetcher]</magenta> Starting homepage cache warmup ({len(homepage_queries)} queries)"
|
|
||||||
)
|
|
||||||
|
|
||||||
for i, (query, cursor) in enumerate(homepage_queries):
|
for i, (query, cursor) in enumerate(homepage_queries):
|
||||||
try:
|
try:
|
||||||
@@ -309,9 +302,7 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
|
|
||||||
# 检查是否已经缓存
|
# 检查是否已经缓存
|
||||||
if await redis_client.exists(cache_key):
|
if await redis_client.exists(cache_key):
|
||||||
logger.opt(colors=True).debug(
|
logger.opt(colors=True).debug(f"Query {query.sort} already cached")
|
||||||
f"<magenta>[BeatmapsetFetcher]</magenta> Query {query.sort} already cached"
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 请求并缓存
|
# 请求并缓存
|
||||||
@@ -334,24 +325,15 @@ class BeatmapsetFetcher(BaseFetcher):
|
|||||||
ex=cache_ttl,
|
ex=cache_ttl,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.opt(colors=True).info(
|
logger.opt(colors=True).info(f"Warmed up cache for {query.sort} (TTL: {cache_ttl}s)")
|
||||||
f"<magenta>[BeatmapsetFetcher]</magenta> Warmed up cache for {query.sort} (TTL: {cache_ttl}s)"
|
|
||||||
)
|
|
||||||
|
|
||||||
if api_response.get("cursor"):
|
if api_response.get("cursor"):
|
||||||
try:
|
try:
|
||||||
await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2)
|
await self.prefetch_next_pages(query, api_response["cursor"], redis_client, pages=2)
|
||||||
except RateLimitError:
|
except RateLimitError:
|
||||||
logger.opt(colors=True).info(
|
logger.opt(colors=True).info(f"Warmup prefetch skipped for {query.sort} due to rate limit")
|
||||||
f"<yellow>[BeatmapsetFetcher]</yellow> Warmup prefetch "
|
|
||||||
f"skipped for {query.sort} due to rate limit"
|
|
||||||
)
|
|
||||||
|
|
||||||
except RateLimitError:
|
except RateLimitError:
|
||||||
logger.opt(colors=True).warning(
|
logger.opt(colors=True).warning(f"Warmup skipped for {query.sort} due to rate limit")
|
||||||
f"<yellow>[BeatmapsetFetcher]</yellow> Warmup skipped for {query.sort} due to rate limit"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.opt(colors=True).error(
|
logger.opt(colors=True).error(f"Failed to warmup cache for {query.sort}: {e}")
|
||||||
f"<red>[BeatmapsetFetcher]</red> Failed to warmup cache for {query.sort}: {e}"
|
|
||||||
)
|
|
||||||
|
|||||||
59
app/log.py
59
app/log.py
@@ -5,6 +5,7 @@ import inspect
|
|||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
from sys import stdout
|
from sys import stdout
|
||||||
|
from types import FunctionType
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from app.config import settings
|
from app.config import settings
|
||||||
@@ -107,11 +108,67 @@ class InterceptHandler(logging.Handler):
|
|||||||
return message
|
return message
|
||||||
|
|
||||||
|
|
||||||
|
def get_caller_class_name(module_prefix: str = ""):
|
||||||
|
"""获取调用类名/模块名,仅对指定模块前缀生效"""
|
||||||
|
stack = inspect.stack()
|
||||||
|
for frame_info in stack[2:]:
|
||||||
|
module = frame_info.frame.f_globals.get("__name__", "")
|
||||||
|
if module_prefix and not module.startswith(module_prefix):
|
||||||
|
continue
|
||||||
|
|
||||||
|
local_vars = frame_info.frame.f_locals
|
||||||
|
# 实例方法
|
||||||
|
if "self" in local_vars:
|
||||||
|
return local_vars["self"].__class__.__name__
|
||||||
|
# 类方法
|
||||||
|
if "cls" in local_vars:
|
||||||
|
return local_vars["cls"].__name__
|
||||||
|
|
||||||
|
# 静态方法 / 普通函数 -> 尝试通过函数名匹配类
|
||||||
|
func_name = frame_info.function
|
||||||
|
for obj_name, obj in frame_info.frame.f_globals.items():
|
||||||
|
if isinstance(obj, type): # 遍历模块内类
|
||||||
|
cls = obj
|
||||||
|
attr = getattr(cls, func_name, None)
|
||||||
|
if isinstance(attr, (staticmethod, classmethod, FunctionType)):
|
||||||
|
return cls.__name__
|
||||||
|
|
||||||
|
# 如果没找到类,返回模块名
|
||||||
|
return module
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def service_logger(name: str) -> Logger:
|
||||||
|
return logger.bind(service=name)
|
||||||
|
|
||||||
|
|
||||||
|
def fetcher_logger(name: str) -> Logger:
|
||||||
|
return logger.bind(fetcher=name)
|
||||||
|
|
||||||
|
|
||||||
|
def dynamic_format(record):
|
||||||
|
prefix = ""
|
||||||
|
|
||||||
|
fetcher = record["extra"].get("fetcher")
|
||||||
|
if not fetcher:
|
||||||
|
fetcher = get_caller_class_name("app.fetcher")
|
||||||
|
if fetcher:
|
||||||
|
prefix = f"<magenta>[{fetcher}]</magenta> "
|
||||||
|
|
||||||
|
service = record["extra"].get("service")
|
||||||
|
if not service:
|
||||||
|
service = get_caller_class_name("app.service")
|
||||||
|
if service:
|
||||||
|
prefix = f"<blue>[{service}]</blue> "
|
||||||
|
|
||||||
|
return f"<green>{{time:YYYY-MM-DD HH:mm:ss}}</green> [<level>{{level}}</level>] | {prefix}{{message}}\n"
|
||||||
|
|
||||||
|
|
||||||
logger.remove()
|
logger.remove()
|
||||||
logger.add(
|
logger.add(
|
||||||
stdout,
|
stdout,
|
||||||
colorize=True,
|
colorize=True,
|
||||||
format=("<green>{time:YYYY-MM-DD HH:mm:ss}</green> [<level>{level}</level>] | {message}"),
|
format=dynamic_format,
|
||||||
level=settings.log_level,
|
level=settings.log_level,
|
||||||
diagnose=settings.debug,
|
diagnose=settings.debug,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -5,9 +5,10 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.log import logger
|
||||||
|
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
import httpx
|
import httpx
|
||||||
from loguru import logger
|
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -136,9 +136,7 @@ class BeatmapsetUpdateService:
|
|||||||
status = BeatmapRankStatus(beatmapset.ranked)
|
status = BeatmapRankStatus(beatmapset.ranked)
|
||||||
if status.has_pp() or status == BeatmapRankStatus.LOVED:
|
if status.has_pp() or status == BeatmapRankStatus.LOVED:
|
||||||
return False
|
return False
|
||||||
logger.opt(colors=True).debug(
|
logger.debug(f"added missing beatmapset {beatmapset_id} ")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> added missing beatmapset {beatmapset_id} "
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def add_missing_beatmapsets(self):
|
async def add_missing_beatmapsets(self):
|
||||||
@@ -167,11 +165,9 @@ class BeatmapsetUpdateService:
|
|||||||
if await self.add_missing_beatmapset(missing):
|
if await self.add_missing_beatmapset(missing):
|
||||||
total += 1
|
total += 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.opt(colors=True).error(
|
logger.error(f"failed to add missing beatmapset {missing}: {e}")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> failed to add missing beatmapset {missing}: {e}"
|
|
||||||
)
|
|
||||||
if total > 0:
|
if total > 0:
|
||||||
logger.opt(colors=True).info(f"<cyan>[BeatmapsetUpdateService]</cyan> added {total} missing beatmapset")
|
logger.info(f"added {total} missing beatmapset")
|
||||||
self._adding_missing = False
|
self._adding_missing = False
|
||||||
|
|
||||||
async def add(self, beatmapset: BeatmapsetResp):
|
async def add(self, beatmapset: BeatmapsetResp):
|
||||||
@@ -211,23 +207,17 @@ class BeatmapsetUpdateService:
|
|||||||
processing = ProcessingBeatmapset(beatmapset, sync_record)
|
processing = ProcessingBeatmapset(beatmapset, sync_record)
|
||||||
next_time_delta = processing.calculate_next_sync_time()
|
next_time_delta = processing.calculate_next_sync_time()
|
||||||
if not next_time_delta:
|
if not next_time_delta:
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[{beatmapset.id}] beatmapset has transformed to ranked or loved, removing from sync list")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmapset.id}] "
|
|
||||||
"beatmapset has transformed to ranked or loved,"
|
|
||||||
" removing from sync list"
|
|
||||||
)
|
|
||||||
await session.delete(sync_record)
|
await session.delete(sync_record)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
return
|
return
|
||||||
sync_record.next_sync_time = utcnow() + next_time_delta
|
sync_record.next_sync_time = utcnow() + next_time_delta
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[{beatmapset.id}] next sync at {sync_record.next_sync_time}")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmapset.id}] next sync at {sync_record.next_sync_time}"
|
|
||||||
)
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
async def _update_beatmaps(self):
|
async def _update_beatmaps(self):
|
||||||
async with with_db() as session:
|
async with with_db() as session:
|
||||||
logger.opt(colors=True).info("<cyan>[BeatmapsetUpdateService]</cyan> checking for beatmapset updates...")
|
logger.info("checking for beatmapset updates...")
|
||||||
now = utcnow()
|
now = utcnow()
|
||||||
records = await session.exec(
|
records = await session.exec(
|
||||||
select(BeatmapSync)
|
select(BeatmapSync)
|
||||||
@@ -235,22 +225,17 @@ class BeatmapsetUpdateService:
|
|||||||
.order_by(col(BeatmapSync.next_sync_time).desc())
|
.order_by(col(BeatmapSync.next_sync_time).desc())
|
||||||
)
|
)
|
||||||
for record in records:
|
for record in records:
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[{record.beatmapset_id}] syncing...")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] syncing..."
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id)
|
beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if isinstance(e, HTTPError):
|
if isinstance(e, HTTPError):
|
||||||
logger.opt(colors=True).warning(
|
logger.warning(
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] "
|
f"[{record.beatmapset_id}] "
|
||||||
f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later"
|
f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.opt(colors=True).exception(
|
logger.exception(f"[{record.beatmapset_id}] unexpected error: {e}, retrying later")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] "
|
|
||||||
f"unexpected error: {e}, retrying later"
|
|
||||||
)
|
|
||||||
record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA)
|
record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA)
|
||||||
continue
|
continue
|
||||||
processing = ProcessingBeatmapset(beatmapset, record)
|
processing = ProcessingBeatmapset(beatmapset, record)
|
||||||
@@ -282,18 +267,15 @@ class BeatmapsetUpdateService:
|
|||||||
|
|
||||||
next_time_delta = processing.calculate_next_sync_time()
|
next_time_delta = processing.calculate_next_sync_time()
|
||||||
if not next_time_delta:
|
if not next_time_delta:
|
||||||
logger.opt(colors=True).info(
|
logger.info(
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] beatmapset "
|
f"[{record.beatmapset_id}] beatmapset "
|
||||||
"has transformed to ranked or loved,"
|
"has transformed to ranked or loved,"
|
||||||
" removing from sync list"
|
" removing from sync list"
|
||||||
)
|
)
|
||||||
await session.delete(record)
|
await session.delete(record)
|
||||||
else:
|
else:
|
||||||
record.next_sync_time = utcnow() + next_time_delta
|
record.next_sync_time = utcnow() + next_time_delta
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[{record.beatmapset_id}] next sync at {record.next_sync_time}")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] "
|
|
||||||
f"next sync at {record.next_sync_time}"
|
|
||||||
)
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp):
|
async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp):
|
||||||
@@ -323,9 +305,7 @@ class BeatmapsetUpdateService:
|
|||||||
await score.ranked_score.delete(session)
|
await score.ranked_score.delete(session)
|
||||||
total += 1
|
total += 1
|
||||||
if total > 0:
|
if total > 0:
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[beatmap: {beatmap_id}] processed {total} old scores")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [beatmap: {beatmap_id}] processed {total} old scores"
|
|
||||||
)
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
for change in changed:
|
for change in changed:
|
||||||
@@ -333,28 +313,17 @@ class BeatmapsetUpdateService:
|
|||||||
try:
|
try:
|
||||||
beatmap = await self.fetcher.get_beatmap(change.beatmap_id)
|
beatmap = await self.fetcher.get_beatmap(change.beatmap_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.opt(colors=True).error(
|
logger.error(f"[beatmap: {change.beatmap_id}] failed to fetch added beatmap: {e}, skipping")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [beatmap: {change.beatmap_id}] "
|
|
||||||
f"failed to fetch added beatmap: {e}, skipping"
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[{beatmap.beatmapset_id}] adding beatmap {beatmap.id}")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmap.beatmapset_id}] adding beatmap {beatmap.id}"
|
|
||||||
)
|
|
||||||
await Beatmap.from_resp_no_save(session, beatmap)
|
await Beatmap.from_resp_no_save(session, beatmap)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
beatmap = await self.fetcher.get_beatmap(change.beatmap_id)
|
beatmap = await self.fetcher.get_beatmap(change.beatmap_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.opt(colors=True).error(
|
logger.error(f"[beatmap: {change.beatmap_id}] failed to fetch changed beatmap: {e}, skipping")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [beatmap: {change.beatmap_id}] "
|
|
||||||
f"failed to fetch changed beatmap: {e}, skipping"
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
logger.opt(colors=True).info(
|
logger.info(f"[{beatmap.beatmapset_id}] processing beatmap {beatmap.id} change {change.type}")
|
||||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmap.beatmapset_id}] processing beatmap "
|
|
||||||
f"{beatmap.id} change {change.type}"
|
|
||||||
)
|
|
||||||
new_db_beatmap = await Beatmap.from_resp_no_save(session, beatmap)
|
new_db_beatmap = await Beatmap.from_resp_no_save(session, beatmap)
|
||||||
existing_beatmap = await session.get(Beatmap, change.beatmap_id)
|
existing_beatmap = await session.get(Beatmap, change.beatmap_id)
|
||||||
if existing_beatmap:
|
if existing_beatmap:
|
||||||
|
|||||||
@@ -430,7 +430,7 @@ class LoginSessionService:
|
|||||||
await db.commit()
|
await db.commit()
|
||||||
await db.refresh(session)
|
await db.refresh(session)
|
||||||
|
|
||||||
logger.info(f"[Login Session] Created session for user {user_id} (new device: {is_new_device})")
|
logger.info(f"Created session for user {user_id} (new device: {is_new_device})")
|
||||||
return session
|
return session
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -562,7 +562,7 @@ class LoginSessionService:
|
|||||||
session.device_id = device_info.id
|
session.device_id = device_info.id
|
||||||
|
|
||||||
if sessions:
|
if sessions:
|
||||||
logger.info(f"[Login Session] Marked {len(sessions)} session(s) as verified for user {user_id}")
|
logger.info(f"Marked {len(sessions)} session(s) as verified for user {user_id}")
|
||||||
|
|
||||||
await LoginSessionService.clear_login_method(user_id, token_id, redis)
|
await LoginSessionService.clear_login_method(user_id, token_id, redis)
|
||||||
await db.commit()
|
await db.commit()
|
||||||
@@ -570,7 +570,7 @@ class LoginSessionService:
|
|||||||
return len(sessions) > 0
|
return len(sessions) > 0
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[Login Session] Exception during marking sessions as verified: {e}")
|
logger.error(f"Exception during marking sessions as verified: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user