From 1db34bf5c543d2bfb835d1f29012a3d4d6bb4566 Mon Sep 17 00:00:00 2001 From: MingxuanGame Date: Sun, 5 Oct 2025 03:44:46 +0000 Subject: [PATCH] feat(sync): add configuration to control syncing & support sync immediately by API --- app/config.py | 12 ++ app/router/private/beatmapset.py | 11 +- app/service/beatmapset_update_service.py | 191 +++++++++++++---------- app/tasks/beatmapset_update.py | 24 +-- 4 files changed, 140 insertions(+), 98 deletions(-) diff --git a/app/config.py b/app/config.py index 27ce437..0c42c97 100644 --- a/app/config.py +++ b/app/config.py @@ -567,6 +567,18 @@ STORAGE_SETTINGS='{ "资源代理设置", ] + # 谱面同步设置 + enable_auto_beatmap_sync: Annotated[ + bool, + Field(default=False, description="启用自动谱面同步"), + "谱面同步设置", + ] + beatmap_sync_interval_minutes: Annotated[ + int, + Field(default=60, description="自动谱面同步间隔(分钟)"), + "谱面同步设置", + ] + # 反作弊设置 suspicious_score_check: Annotated[ bool, diff --git a/app/router/private/beatmapset.py b/app/router/private/beatmapset.py index f206562..fff4d04 100644 --- a/app/router/private/beatmapset.py +++ b/app/router/private/beatmapset.py @@ -10,7 +10,7 @@ from app.service.beatmapset_update_service import get_beatmapset_update_service from .router import router -from fastapi import Body, Depends, HTTPException +from fastapi import Body, Depends, HTTPException, Path, Query from fastapi_limiter.depends import RateLimiter from sqlmodel import col, exists, select @@ -93,21 +93,24 @@ async def rate_beatmaps( dependencies=[Depends(RateLimiter(times=50, hours=1))], ) async def sync_beatmapset( - beatmapset_id: int, + beatmapset_id: Annotated[int, Path(..., description="谱面集ID")], session: Database, current_user: ClientUser, + immediate: Annotated[bool, Query(description="是否立即同步")] = False, ): """请求同步谱面集 请求将指定的谱面集从 Bancho 同步到服务器 - 请求发送后会加入同步队列,等待自动同步 + 默认情况下请求会加入同步队列,等待自动同步。 + 若设置 `immediate=true`,会尝试立刻同步该谱面集。 速率限制: - 每个用户每小时最多50次请求 参数: - beatmapset_id: 谱面集ID + - immediate: 是否立即同步(默认 false) 错误情况: - 404: 找不到指定谱面集 @@ -115,4 +118,4 @@ async def sync_beatmapset( current_beatmapset = (await session.exec(select(exists()).where(Beatmapset.id == beatmapset_id))).first() if not current_beatmapset: raise HTTPException(404, "Beatmapset Not Found") - await get_beatmapset_update_service().add_missing_beatmapset(beatmapset_id) + await get_beatmapset_update_service().add_missing_beatmapset(beatmapset_id, immediate) diff --git a/app/service/beatmapset_update_service.py b/app/service/beatmapset_update_service.py index da01e1f..19af886 100644 --- a/app/service/beatmapset_update_service.py +++ b/app/service/beatmapset_update_service.py @@ -17,6 +17,7 @@ from app.utils import bg_tasks, utcnow from httpx import HTTPError, HTTPStatusError from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession if TYPE_CHECKING: from app.fetcher import Fetcher @@ -67,7 +68,7 @@ class ProcessingBeatmapset: def calculate_next_sync_time( self, ) -> timedelta | None: - if self.status.has_pp() or self.status == BeatmapRankStatus.LOVED: + if self.status.ranked(): return None now = utcnow() @@ -125,12 +126,16 @@ class BeatmapsetUpdateService: self.fetcher = fetcher self._adding_missing = False - async def add_missing_beatmapset(self, beatmapset_id: int) -> bool: + async def add_missing_beatmapset(self, beatmapset_id: int, immediate: bool = False) -> bool: beatmapset = await self.fetcher.get_beatmapset(beatmapset_id) - await self.add(beatmapset) status = BeatmapRankStatus(beatmapset.ranked) - if status.has_pp() or status == BeatmapRankStatus.LOVED: + if status.ranked(): return False + if immediate: + await self._sync_immediately(beatmapset) + logger.debug(f"triggered immediate sync for beatmapset {beatmapset_id} ") + return True + await self.add(beatmapset) logger.debug(f"added missing beatmapset {beatmapset_id} ") return True @@ -165,11 +170,8 @@ class BeatmapsetUpdateService: logger.opt(colors=True).info(f"added {total} missing beatmapset") self._adding_missing = False - async def add(self, beatmapset: BeatmapsetResp): - if ( - BeatmapRankStatus(beatmapset.ranked).has_pp() - or BeatmapRankStatus(beatmapset.ranked) == BeatmapRankStatus.LOVED - ): + async def add(self, beatmapset: BeatmapsetResp, calculate_next_sync: bool = True): + if BeatmapRankStatus(beatmapset.ranked).ranked(): return async with with_db() as session: sync_record = await session.get(BeatmapSync, beatmapset.id) @@ -198,20 +200,104 @@ class BeatmapsetUpdateService: for bm in beatmapset.beatmaps ] sync_record.beatmap_status = BeatmapRankStatus(beatmapset.ranked) - - processing = ProcessingBeatmapset(beatmapset, sync_record) - next_time_delta = processing.calculate_next_sync_time() - if not next_time_delta: - logger.opt(colors=True).info( - f"[{beatmapset.id}] beatmapset has transformed to ranked or loved, removing from sync list" - ) - await session.delete(sync_record) - await session.commit() - return - sync_record.next_sync_time = utcnow() + next_time_delta + if calculate_next_sync: + processing = ProcessingBeatmapset(beatmapset, sync_record) + next_time_delta = processing.calculate_next_sync_time() + if not next_time_delta: + logger.opt(colors=True).info( + f"[{beatmapset.id}] beatmapset has transformed to " + "ranked or loved, removing from sync list" + ) + await session.delete(sync_record) + await session.commit() + return + sync_record.next_sync_time = utcnow() + next_time_delta logger.opt(colors=True).info(f"[{beatmapset.id}] next sync at {sync_record.next_sync_time}") await session.commit() + async def _sync_immediately(self, beatmapset: BeatmapsetResp) -> None: + async with with_db() as session: + record = await session.get(BeatmapSync, beatmapset.id) + if not record: + record = BeatmapSync( + beatmapset_id=beatmapset.id, + beatmaps=[], + beatmap_status=BeatmapRankStatus(beatmapset.ranked), + ) + session.add(record) + await session.commit() + await session.refresh(record) + await self.sync(record, session, beatmapset=beatmapset) + await session.commit() + + async def sync( + self, + record: BeatmapSync, + session: AsyncSession, + *, + beatmapset: BeatmapsetResp | None = None, + ): + logger.opt(colors=True).info(f"[{record.beatmapset_id}] syncing...") + if beatmapset is None: + try: + beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id) + except Exception as e: + if isinstance(e, HTTPStatusError) and e.response.status_code == 404: + logger.opt(colors=True).warning( + f"[{record.beatmapset_id}] beatmapset not found (404), removing from sync list" + ) + await session.delete(record) + await session.commit() + return + if isinstance(e, HTTPError): + logger.opt(colors=True).warning( + f"[{record.beatmapset_id}] " + f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later" + ) + else: + logger.opt(colors=True).exception( + f"[{record.beatmapset_id}] unexpected error: {e}, retrying later" + ) + record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA) + return + processing = ProcessingBeatmapset(beatmapset, record) + changed_beatmaps = processing.changed_beatmaps + changed = processing.beatmapset_changed or changed_beatmaps + if changed: + record.beatmaps = [ + SavedBeatmapMeta( + beatmap_id=bm.id, + md5=bm.checksum, + is_deleted=False, + beatmap_status=BeatmapRankStatus(bm.ranked), + ) + for bm in beatmapset.beatmaps + ] + record.beatmap_status = BeatmapRankStatus(beatmapset.ranked) + record.consecutive_no_change = 0 + + bg_tasks.add_task( + self._process_changed_beatmaps, + changed_beatmaps, + ) + bg_tasks.add_task( + self._process_changed_beatmapset, + beatmapset, + ) + else: + record.consecutive_no_change += 1 + + next_time_delta = processing.calculate_next_sync_time() + if not next_time_delta: + logger.opt(colors=True).info( + f"[{beatmapset.id}] beatmapset has transformed to ranked or loved," + f" removing from sync list" + ) + await session.delete(record) + else: + record.next_sync_time = utcnow() + next_time_delta + logger.opt(colors=True).info(f"[{record.beatmapset_id}] next sync at {record.next_sync_time}") + async def _update_beatmaps(self): async with with_db() as session: logger.info("checking for beatmapset updates...") @@ -222,67 +308,7 @@ class BeatmapsetUpdateService: .order_by(col(BeatmapSync.next_sync_time).desc()) ) for record in records: - logger.opt(colors=True).info(f"[{record.beatmapset_id}] syncing...") - try: - beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id) - except Exception as e: - if isinstance(e, HTTPStatusError) and e.response.status_code == 404: - logger.opt(colors=True).warning( - f"[{record.beatmapset_id}] beatmapset not found (404), removing from sync list" - ) - await session.delete(record) - await session.commit() - continue - if isinstance(e, HTTPError): - logger.opt(colors=True).warning( - f"[{record.beatmapset_id}] " - f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later" - ) - else: - logger.opt(colors=True).exception( - f"[{record.beatmapset_id}] unexpected error: {e}, retrying later" - ) - record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA) - continue - processing = ProcessingBeatmapset(beatmapset, record) - changed_beatmaps = processing.changed_beatmaps - changed = processing.beatmapset_changed or changed_beatmaps - if changed: - record.beatmaps = [ - SavedBeatmapMeta( - beatmap_id=bm.id, - md5=bm.checksum, - is_deleted=False, - beatmap_status=BeatmapRankStatus(bm.ranked), - ) - for bm in beatmapset.beatmaps - ] - record.beatmap_status = BeatmapRankStatus(beatmapset.ranked) - record.consecutive_no_change = 0 - - bg_tasks.add_task( - self._process_changed_beatmaps, - changed_beatmaps, - ) - bg_tasks.add_task( - self._process_changed_beatmapset, - beatmapset, - ) - else: - record.consecutive_no_change += 1 - - next_time_delta = processing.calculate_next_sync_time() - if not next_time_delta: - logger.opt(colors=True).info( - f"[{beatmapset.id}] beatmapset has transformed to ranked or loved," - f" removing from sync list" - ) - await session.delete(record) - else: - record.next_sync_time = utcnow() + next_time_delta - logger.opt(colors=True).info( - f"[{record.beatmapset_id}] next sync at {record.next_sync_time}" - ) + await self.sync(record, session) await session.commit() async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp): @@ -382,7 +408,8 @@ def init_beatmapset_update_service(fetcher: "Fetcher") -> BeatmapsetUpdateServic global service if service is None: service = BeatmapsetUpdateService(fetcher) - bg_tasks.add_task(service.add_missing_beatmapsets) + if settings.enable_auto_beatmap_sync: + bg_tasks.add_task(service.add_missing_beatmapsets) return service diff --git a/app/tasks/beatmapset_update.py b/app/tasks/beatmapset_update.py index ebda7bb..38fd07c 100644 --- a/app/tasks/beatmapset_update.py +++ b/app/tasks/beatmapset_update.py @@ -1,19 +1,19 @@ from datetime import datetime, timedelta +from app.config import settings from app.dependencies.scheduler import get_scheduler from app.service.beatmapset_update_service import get_beatmapset_update_service from app.utils import bg_tasks -SCHEDULER_INTERVAL_MINUTES = 2 +if settings.enable_auto_beatmap_sync: - -@get_scheduler().scheduled_job( - "interval", - id="update_beatmaps", - minutes=SCHEDULER_INTERVAL_MINUTES, - next_run_time=datetime.now() + timedelta(minutes=1), -) -async def beatmapset_update_job(): - service = get_beatmapset_update_service() - bg_tasks.add_task(service.add_missing_beatmapsets) - await service._update_beatmaps() + @get_scheduler().scheduled_job( + "interval", + id="update_beatmaps", + minutes=settings.beatmap_sync_interval_minutes, + next_run_time=datetime.now() + timedelta(minutes=1), + ) + async def beatmapset_update_job(): + service = get_beatmapset_update_service() + bg_tasks.add_task(service.add_missing_beatmapsets) + await service._update_beatmaps()