feat(sync): add configuration to control syncing & support sync immediately by API

This commit is contained in:
MingxuanGame
2025-10-05 03:44:46 +00:00
parent 8884f8993c
commit 1db34bf5c5
4 changed files with 140 additions and 98 deletions

View File

@@ -17,6 +17,7 @@ from app.utils import bg_tasks, utcnow
from httpx import HTTPError, HTTPStatusError
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
if TYPE_CHECKING:
from app.fetcher import Fetcher
@@ -67,7 +68,7 @@ class ProcessingBeatmapset:
def calculate_next_sync_time(
self,
) -> timedelta | None:
if self.status.has_pp() or self.status == BeatmapRankStatus.LOVED:
if self.status.ranked():
return None
now = utcnow()
@@ -125,12 +126,16 @@ class BeatmapsetUpdateService:
self.fetcher = fetcher
self._adding_missing = False
async def add_missing_beatmapset(self, beatmapset_id: int) -> bool:
async def add_missing_beatmapset(self, beatmapset_id: int, immediate: bool = False) -> bool:
beatmapset = await self.fetcher.get_beatmapset(beatmapset_id)
await self.add(beatmapset)
status = BeatmapRankStatus(beatmapset.ranked)
if status.has_pp() or status == BeatmapRankStatus.LOVED:
if status.ranked():
return False
if immediate:
await self._sync_immediately(beatmapset)
logger.debug(f"triggered immediate sync for beatmapset {beatmapset_id} ")
return True
await self.add(beatmapset)
logger.debug(f"added missing beatmapset {beatmapset_id} ")
return True
@@ -165,11 +170,8 @@ class BeatmapsetUpdateService:
logger.opt(colors=True).info(f"added {total} missing beatmapset")
self._adding_missing = False
async def add(self, beatmapset: BeatmapsetResp):
if (
BeatmapRankStatus(beatmapset.ranked).has_pp()
or BeatmapRankStatus(beatmapset.ranked) == BeatmapRankStatus.LOVED
):
async def add(self, beatmapset: BeatmapsetResp, calculate_next_sync: bool = True):
if BeatmapRankStatus(beatmapset.ranked).ranked():
return
async with with_db() as session:
sync_record = await session.get(BeatmapSync, beatmapset.id)
@@ -198,20 +200,104 @@ class BeatmapsetUpdateService:
for bm in beatmapset.beatmaps
]
sync_record.beatmap_status = BeatmapRankStatus(beatmapset.ranked)
processing = ProcessingBeatmapset(beatmapset, sync_record)
next_time_delta = processing.calculate_next_sync_time()
if not next_time_delta:
logger.opt(colors=True).info(
f"<g>[{beatmapset.id}]</g> beatmapset has transformed to ranked or loved, removing from sync list"
)
await session.delete(sync_record)
await session.commit()
return
sync_record.next_sync_time = utcnow() + next_time_delta
if calculate_next_sync:
processing = ProcessingBeatmapset(beatmapset, sync_record)
next_time_delta = processing.calculate_next_sync_time()
if not next_time_delta:
logger.opt(colors=True).info(
f"<g>[{beatmapset.id}]</g> beatmapset has transformed to "
"ranked or loved, removing from sync list"
)
await session.delete(sync_record)
await session.commit()
return
sync_record.next_sync_time = utcnow() + next_time_delta
logger.opt(colors=True).info(f"<g>[{beatmapset.id}]</g> next sync at {sync_record.next_sync_time}")
await session.commit()
async def _sync_immediately(self, beatmapset: BeatmapsetResp) -> None:
async with with_db() as session:
record = await session.get(BeatmapSync, beatmapset.id)
if not record:
record = BeatmapSync(
beatmapset_id=beatmapset.id,
beatmaps=[],
beatmap_status=BeatmapRankStatus(beatmapset.ranked),
)
session.add(record)
await session.commit()
await session.refresh(record)
await self.sync(record, session, beatmapset=beatmapset)
await session.commit()
async def sync(
self,
record: BeatmapSync,
session: AsyncSession,
*,
beatmapset: BeatmapsetResp | None = None,
):
logger.opt(colors=True).info(f"<g>[{record.beatmapset_id}]</g> syncing...")
if beatmapset is None:
try:
beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id)
except Exception as e:
if isinstance(e, HTTPStatusError) and e.response.status_code == 404:
logger.opt(colors=True).warning(
f"<g>[{record.beatmapset_id}]</g> beatmapset not found (404), removing from sync list"
)
await session.delete(record)
await session.commit()
return
if isinstance(e, HTTPError):
logger.opt(colors=True).warning(
f"<g>[{record.beatmapset_id}]</g> "
f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later"
)
else:
logger.opt(colors=True).exception(
f"<g>[{record.beatmapset_id}]</g> unexpected error: {e}, retrying later"
)
record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA)
return
processing = ProcessingBeatmapset(beatmapset, record)
changed_beatmaps = processing.changed_beatmaps
changed = processing.beatmapset_changed or changed_beatmaps
if changed:
record.beatmaps = [
SavedBeatmapMeta(
beatmap_id=bm.id,
md5=bm.checksum,
is_deleted=False,
beatmap_status=BeatmapRankStatus(bm.ranked),
)
for bm in beatmapset.beatmaps
]
record.beatmap_status = BeatmapRankStatus(beatmapset.ranked)
record.consecutive_no_change = 0
bg_tasks.add_task(
self._process_changed_beatmaps,
changed_beatmaps,
)
bg_tasks.add_task(
self._process_changed_beatmapset,
beatmapset,
)
else:
record.consecutive_no_change += 1
next_time_delta = processing.calculate_next_sync_time()
if not next_time_delta:
logger.opt(colors=True).info(
f"<yellow>[{beatmapset.id}]</yellow> beatmapset has transformed to ranked or loved,"
f" removing from sync list"
)
await session.delete(record)
else:
record.next_sync_time = utcnow() + next_time_delta
logger.opt(colors=True).info(f"<g>[{record.beatmapset_id}]</g> next sync at {record.next_sync_time}")
async def _update_beatmaps(self):
async with with_db() as session:
logger.info("checking for beatmapset updates...")
@@ -222,67 +308,7 @@ class BeatmapsetUpdateService:
.order_by(col(BeatmapSync.next_sync_time).desc())
)
for record in records:
logger.opt(colors=True).info(f"<g>[{record.beatmapset_id}]</g> syncing...")
try:
beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id)
except Exception as e:
if isinstance(e, HTTPStatusError) and e.response.status_code == 404:
logger.opt(colors=True).warning(
f"<g>[{record.beatmapset_id}]</g> beatmapset not found (404), removing from sync list"
)
await session.delete(record)
await session.commit()
continue
if isinstance(e, HTTPError):
logger.opt(colors=True).warning(
f"<g>[{record.beatmapset_id}]</g> "
f"failed to fetch beatmapset: [{e.__class__.__name__}] {e}, retrying later"
)
else:
logger.opt(colors=True).exception(
f"<g>[{record.beatmapset_id}]</g> unexpected error: {e}, retrying later"
)
record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA)
continue
processing = ProcessingBeatmapset(beatmapset, record)
changed_beatmaps = processing.changed_beatmaps
changed = processing.beatmapset_changed or changed_beatmaps
if changed:
record.beatmaps = [
SavedBeatmapMeta(
beatmap_id=bm.id,
md5=bm.checksum,
is_deleted=False,
beatmap_status=BeatmapRankStatus(bm.ranked),
)
for bm in beatmapset.beatmaps
]
record.beatmap_status = BeatmapRankStatus(beatmapset.ranked)
record.consecutive_no_change = 0
bg_tasks.add_task(
self._process_changed_beatmaps,
changed_beatmaps,
)
bg_tasks.add_task(
self._process_changed_beatmapset,
beatmapset,
)
else:
record.consecutive_no_change += 1
next_time_delta = processing.calculate_next_sync_time()
if not next_time_delta:
logger.opt(colors=True).info(
f"<yellow>[{beatmapset.id}]</yellow> beatmapset has transformed to ranked or loved,"
f" removing from sync list"
)
await session.delete(record)
else:
record.next_sync_time = utcnow() + next_time_delta
logger.opt(colors=True).info(
f"<g>[{record.beatmapset_id}]</g> next sync at {record.next_sync_time}"
)
await self.sync(record, session)
await session.commit()
async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp):
@@ -382,7 +408,8 @@ def init_beatmapset_update_service(fetcher: "Fetcher") -> BeatmapsetUpdateServic
global service
if service is None:
service = BeatmapsetUpdateService(fetcher)
bg_tasks.add_task(service.add_missing_beatmapsets)
if settings.enable_auto_beatmap_sync:
bg_tasks.add_task(service.add_missing_beatmapsets)
return service