feat(beatmap,score): update beatmaps from Bancho & deleting scores (#50)
New API:
- DELETE /api/private/score/{score_id}: delete a score
- POST /api/private/beatmapsets/{beatmapset_id}/sync: request for syncing a beatmapset
New configuration:
- OLD_SCORE_PROCESSING_MODE
This commit is contained in:
371
app/service/beatmapset_update_service.py
Normal file
371
app/service/beatmapset_update_service.py
Normal file
@@ -0,0 +1,371 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from enum import Enum
|
||||
import math
|
||||
import random
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from app.config import OldScoreProcessingMode, settings
|
||||
from app.database.beatmap import Beatmap
|
||||
from app.database.beatmap_sync import BeatmapSync, SavedBeatmapMeta
|
||||
from app.database.beatmapset import Beatmapset, BeatmapsetResp
|
||||
from app.database.score import Score
|
||||
from app.dependencies.database import with_db
|
||||
from app.dependencies.scheduler import get_scheduler
|
||||
from app.dependencies.storage import get_storage_service
|
||||
from app.log import logger
|
||||
from app.models.beatmap import BeatmapRankStatus
|
||||
from app.utils import bg_tasks, utcnow
|
||||
|
||||
from httpx import HTTPError
|
||||
from sqlmodel import col, select
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.fetcher import Fetcher
|
||||
|
||||
|
||||
class BeatmapChangeType(Enum):
|
||||
MAP_UPDATED = "map_updated"
|
||||
MAP_DELETED = "map_deleted"
|
||||
MAP_ADDED = "map_added"
|
||||
STATUS_CHANGED = "status_changed"
|
||||
|
||||
|
||||
class BeatmapsetChangeType(Enum):
|
||||
STATUS_CHANGED = "status_changed"
|
||||
HYPE_CHANGED = "hype_changed"
|
||||
NOMINATIONS_CHANGED = "nominations_changed"
|
||||
RANKED_DATE_CHANGED = "ranked_date_changed"
|
||||
PLAYCOUNT_CHANGED = "playcount_changed"
|
||||
|
||||
|
||||
class ChangedBeatmap(NamedTuple):
|
||||
beatmap_id: int
|
||||
type: BeatmapChangeType
|
||||
|
||||
|
||||
BASE = 1200
|
||||
TAU = 3600
|
||||
JITTER_MIN = -30
|
||||
JITTER_MAX = 30
|
||||
MIN_DELTA = 1200
|
||||
GROWTH = 2.0
|
||||
GRAVEYARD_DOUBLING_PERIOD_DAYS = 30
|
||||
GRAVEYARD_MAX_DAYS = 365
|
||||
STATUS_FACTOR: dict[BeatmapRankStatus, float] = {
|
||||
BeatmapRankStatus.WIP: 0.5,
|
||||
BeatmapRankStatus.PENDING: 0.5,
|
||||
BeatmapRankStatus.GRAVEYARD: 1,
|
||||
}
|
||||
SCHEDULER_INTERVAL_MINUTES = 2
|
||||
|
||||
|
||||
class ProcessingBeatmapset:
|
||||
def __init__(self, beatmapset: BeatmapsetResp, record: BeatmapSync) -> None:
|
||||
self.beatmapset = beatmapset
|
||||
self.status = BeatmapRankStatus(self.beatmapset.ranked)
|
||||
self.record = record
|
||||
|
||||
def calculate_next_sync_time(
|
||||
self,
|
||||
) -> timedelta | None:
|
||||
if self.status.has_pp() or self.status == BeatmapRankStatus.LOVED:
|
||||
return None
|
||||
|
||||
now = utcnow()
|
||||
if self.status == BeatmapRankStatus.QUALIFIED:
|
||||
assert self.beatmapset.ranked_date is not None, "ranked_date should not be None for qualified maps"
|
||||
time_to_ranked = (self.beatmapset.ranked_date + timedelta(days=7) - now).total_seconds()
|
||||
baseline = max(MIN_DELTA, time_to_ranked / 2)
|
||||
next_delta = max(MIN_DELTA, baseline)
|
||||
elif self.status in {BeatmapRankStatus.WIP, BeatmapRankStatus.PENDING}:
|
||||
seconds_since_update = (now - self.beatmapset.last_updated).total_seconds()
|
||||
factor_update = max(1.0, seconds_since_update / TAU)
|
||||
factor_play = 1.0 + math.log(1.0 + self.beatmapset.play_count)
|
||||
status_factor = STATUS_FACTOR[self.status]
|
||||
baseline = BASE * factor_play / factor_update * status_factor
|
||||
next_delta = max(MIN_DELTA, baseline * (GROWTH ** (self.record.consecutive_no_change + 1)))
|
||||
elif self.status == BeatmapRankStatus.GRAVEYARD:
|
||||
days_since_update = (now - self.beatmapset.last_updated).days
|
||||
doubling_periods = days_since_update / GRAVEYARD_DOUBLING_PERIOD_DAYS
|
||||
delta = MIN_DELTA * (2**doubling_periods)
|
||||
max_seconds = GRAVEYARD_MAX_DAYS * 86400
|
||||
next_delta = min(max_seconds, delta)
|
||||
else:
|
||||
next_delta = MIN_DELTA
|
||||
jitter = timedelta(minutes=random.randint(JITTER_MIN, JITTER_MAX))
|
||||
return timedelta(seconds=next_delta) + jitter
|
||||
|
||||
@property
|
||||
def beatmapset_changed(self) -> bool:
|
||||
return self.record.beatmap_status != BeatmapRankStatus(self.beatmapset.ranked)
|
||||
|
||||
@property
|
||||
def changed_beatmaps(self) -> list[ChangedBeatmap]:
|
||||
changed_beatmaps = []
|
||||
for bm in self.beatmapset.beatmaps:
|
||||
saved = next((s for s in self.record.beatmaps if s["beatmap_id"] == bm.id), None)
|
||||
if not saved:
|
||||
changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.MAP_ADDED))
|
||||
elif saved["is_deleted"]:
|
||||
changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.MAP_ADDED))
|
||||
elif saved["md5"] != bm.checksum:
|
||||
changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.MAP_UPDATED))
|
||||
elif saved["beatmap_status"] != BeatmapRankStatus(bm.ranked):
|
||||
changed_beatmaps.append(ChangedBeatmap(bm.id, BeatmapChangeType.STATUS_CHANGED))
|
||||
for saved in self.record.beatmaps:
|
||||
if not any(bm.id == saved["beatmap_id"] for bm in self.beatmapset.beatmaps) and not saved["is_deleted"]:
|
||||
changed_beatmaps.append(ChangedBeatmap(saved["beatmap_id"], BeatmapChangeType.MAP_DELETED))
|
||||
return changed_beatmaps
|
||||
|
||||
|
||||
class BeatmapsetUpdateService:
|
||||
def __init__(self, fetcher: "Fetcher"):
|
||||
self.fetcher = fetcher
|
||||
|
||||
async def add_missing_beatmapset(self, beatmapset_id: int) -> bool:
|
||||
beatmapset = await self.fetcher.get_beatmapset(beatmapset_id)
|
||||
await self.add(beatmapset)
|
||||
status = BeatmapRankStatus(beatmapset.ranked)
|
||||
if status.has_pp() or status == BeatmapRankStatus.LOVED:
|
||||
return False
|
||||
logger.opt(colors=True).debug(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> added missing beatmapset {beatmapset_id} "
|
||||
)
|
||||
return True
|
||||
|
||||
async def add_missing_beatmapsets(self):
|
||||
async with with_db() as session:
|
||||
missings = await session.exec(
|
||||
select(Beatmapset.id).where(
|
||||
col(Beatmapset.beatmap_status).in_(
|
||||
[
|
||||
BeatmapRankStatus.WIP,
|
||||
BeatmapRankStatus.PENDING,
|
||||
BeatmapRankStatus.GRAVEYARD,
|
||||
BeatmapRankStatus.QUALIFIED,
|
||||
]
|
||||
),
|
||||
col(Beatmapset.id).notin_(select(BeatmapSync.beatmapset_id)),
|
||||
)
|
||||
)
|
||||
total = 0
|
||||
for missing in missings:
|
||||
if await self.add_missing_beatmapset(missing):
|
||||
total += 1
|
||||
if total > 0:
|
||||
logger.opt(colors=True).info(f"<cyan>[BeatmapsetUpdateService]</cyan> added {total} missing beatmapset")
|
||||
|
||||
async def add(self, beatmapset: BeatmapsetResp):
|
||||
if (
|
||||
BeatmapRankStatus(beatmapset.ranked).has_pp()
|
||||
or BeatmapRankStatus(beatmapset.ranked) == BeatmapRankStatus.LOVED
|
||||
):
|
||||
return
|
||||
async with with_db() as session:
|
||||
sync_record = await session.get(BeatmapSync, beatmapset.id)
|
||||
if not sync_record:
|
||||
sync_record = BeatmapSync(
|
||||
beatmapset_id=beatmapset.id,
|
||||
beatmaps=[
|
||||
SavedBeatmapMeta(
|
||||
beatmap_id=bm.id,
|
||||
md5=bm.checksum,
|
||||
is_deleted=False,
|
||||
beatmap_status=BeatmapRankStatus(bm.ranked),
|
||||
)
|
||||
for bm in beatmapset.beatmaps
|
||||
],
|
||||
beatmap_status=BeatmapRankStatus(beatmapset.ranked),
|
||||
)
|
||||
session.add(sync_record)
|
||||
await session.commit()
|
||||
await session.refresh(sync_record)
|
||||
else:
|
||||
sync_record.beatmaps = [
|
||||
SavedBeatmapMeta(
|
||||
beatmap_id=bm.id, md5=bm.checksum, is_deleted=False, beatmap_status=BeatmapRankStatus(bm.ranked)
|
||||
)
|
||||
for bm in beatmapset.beatmaps
|
||||
]
|
||||
sync_record.beatmap_status = BeatmapRankStatus(beatmapset.ranked)
|
||||
|
||||
processing = ProcessingBeatmapset(beatmapset, sync_record)
|
||||
next_time_delta = processing.calculate_next_sync_time()
|
||||
if not next_time_delta:
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmapset.id}] "
|
||||
"beatmapset has transformed to ranked or loved,"
|
||||
" removing from sync list"
|
||||
)
|
||||
await session.delete(sync_record)
|
||||
await session.commit()
|
||||
return
|
||||
sync_record.next_sync_time = utcnow() + next_time_delta
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmapset.id}] next sync at {sync_record.next_sync_time}"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def _update_beatmaps(self):
|
||||
async with with_db() as session:
|
||||
logger.opt(colors=True).info("<cyan>[BeatmapsetUpdateService]</cyan> checking for beatmapset updates...")
|
||||
now = utcnow()
|
||||
records = await session.exec(select(BeatmapSync).where(BeatmapSync.next_sync_time <= now))
|
||||
for record in records:
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] syncing..."
|
||||
)
|
||||
try:
|
||||
beatmapset = await self.fetcher.get_beatmapset(record.beatmapset_id)
|
||||
except Exception as e:
|
||||
if isinstance(e, HTTPError):
|
||||
logger.warning(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] "
|
||||
f"failed to fetch beatmapset: {e}, retrying later"
|
||||
)
|
||||
else:
|
||||
logger.exception(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] "
|
||||
f"unexpected error: {e}, retrying later"
|
||||
)
|
||||
record.next_sync_time = utcnow() + timedelta(seconds=MIN_DELTA)
|
||||
continue
|
||||
processing = ProcessingBeatmapset(beatmapset, record)
|
||||
changed_beatmaps = processing.changed_beatmaps
|
||||
changed = processing.beatmapset_changed or changed_beatmaps
|
||||
if changed:
|
||||
record.beatmaps = [
|
||||
SavedBeatmapMeta(
|
||||
beatmap_id=bm.id,
|
||||
md5=bm.checksum,
|
||||
is_deleted=False,
|
||||
beatmap_status=BeatmapRankStatus(bm.ranked),
|
||||
)
|
||||
for bm in beatmapset.beatmaps
|
||||
]
|
||||
record.beatmap_status = BeatmapRankStatus(beatmapset.ranked)
|
||||
record.consecutive_no_change = 0
|
||||
|
||||
bg_tasks.add_task(
|
||||
self._process_changed_beatmaps,
|
||||
changed_beatmaps,
|
||||
)
|
||||
bg_tasks.add_task(
|
||||
self._process_changed_beatmapset,
|
||||
beatmapset,
|
||||
)
|
||||
else:
|
||||
record.consecutive_no_change += 1
|
||||
|
||||
next_time_delta = processing.calculate_next_sync_time()
|
||||
if not next_time_delta:
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] beatmapset "
|
||||
"has transformed to ranked or loved,"
|
||||
" removing from sync list"
|
||||
)
|
||||
await session.delete(record)
|
||||
else:
|
||||
record.next_sync_time = utcnow() + next_time_delta
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{record.beatmapset_id}] "
|
||||
f"next sync at {record.next_sync_time}"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def _process_changed_beatmapset(self, beatmapset: BeatmapsetResp):
|
||||
async with with_db() as session:
|
||||
db_beatmapset = await session.get(Beatmapset, beatmapset.id)
|
||||
new_beatmapset = await Beatmapset.from_resp_no_save(session, beatmapset)
|
||||
if db_beatmapset:
|
||||
await session.merge(new_beatmapset)
|
||||
await session.commit()
|
||||
|
||||
async def _process_changed_beatmaps(self, changed: list[ChangedBeatmap]):
|
||||
storage_service = get_storage_service()
|
||||
async with with_db() as session:
|
||||
|
||||
async def _process_update_or_delete_beatmaps(beatmap_id: int):
|
||||
scores = await session.exec(select(Score).where(Score.beatmap_id == beatmap_id))
|
||||
total = 0
|
||||
for score in scores:
|
||||
if settings.old_score_processing_mode == OldScoreProcessingMode.STRICT:
|
||||
await score.delete(session, storage_service)
|
||||
elif settings.old_score_processing_mode == OldScoreProcessingMode.NORMAL:
|
||||
if await score.awaitable_attrs.best_score:
|
||||
assert score.best_score is not None
|
||||
await score.best_score.delete(session)
|
||||
if await score.awaitable_attrs.ranked_score:
|
||||
assert score.ranked_score is not None
|
||||
await score.ranked_score.delete(session)
|
||||
total += 1
|
||||
if total > 0:
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [beatmap: {beatmap_id}] processed {total} old scores"
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
for change in changed:
|
||||
if change.type == BeatmapChangeType.MAP_ADDED:
|
||||
try:
|
||||
beatmap = await self.fetcher.get_beatmap(change.beatmap_id)
|
||||
except Exception as e:
|
||||
logger.opt(colors=True).error(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [beatmap: {change.beatmap_id}] "
|
||||
f"failed to fetch added beatmap: {e}, skipping"
|
||||
)
|
||||
continue
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmap.beatmapset_id}] adding beatmap {beatmap.id}"
|
||||
)
|
||||
await Beatmap.from_resp_no_save(session, beatmap)
|
||||
else:
|
||||
try:
|
||||
beatmap = await self.fetcher.get_beatmap(change.beatmap_id)
|
||||
except Exception as e:
|
||||
logger.opt(colors=True).error(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [beatmap: {change.beatmap_id}] "
|
||||
f"failed to fetch changed beatmap: {e}, skipping"
|
||||
)
|
||||
continue
|
||||
logger.opt(colors=True).info(
|
||||
f"<cyan>[BeatmapsetUpdateService]</cyan> [{beatmap.beatmapset_id}] processing beatmap "
|
||||
f"{beatmap.id} change {change.type}"
|
||||
)
|
||||
new_db_beatmap = await Beatmap.from_resp_no_save(session, beatmap)
|
||||
existing_beatmap = await session.get(Beatmap, change.beatmap_id)
|
||||
if existing_beatmap:
|
||||
await session.merge(new_db_beatmap)
|
||||
await session.commit()
|
||||
if change.type != BeatmapChangeType.STATUS_CHANGED:
|
||||
await _process_update_or_delete_beatmaps(change.beatmap_id)
|
||||
|
||||
|
||||
service: BeatmapsetUpdateService | None = None
|
||||
|
||||
|
||||
def init_beatmapset_update_service(fetcher: "Fetcher") -> BeatmapsetUpdateService:
|
||||
global service
|
||||
if service is None:
|
||||
service = BeatmapsetUpdateService(fetcher)
|
||||
bg_tasks.add_task(service.add_missing_beatmapsets)
|
||||
return service
|
||||
|
||||
|
||||
def get_beatmapset_update_service() -> BeatmapsetUpdateService:
|
||||
assert service is not None, "BeatmapsetUpdateService is not initialized"
|
||||
return service
|
||||
|
||||
|
||||
@get_scheduler().scheduled_job(
|
||||
"interval",
|
||||
id="update_beatmaps",
|
||||
minutes=SCHEDULER_INTERVAL_MINUTES,
|
||||
next_run_time=datetime.now() + timedelta(minutes=1),
|
||||
)
|
||||
async def beatmapset_update_job():
|
||||
if service is not None:
|
||||
bg_tasks.add_task(service.add_missing_beatmapsets)
|
||||
await service._update_beatmaps()
|
||||
@@ -14,6 +14,7 @@ from app.const import BANCHOBOT_ID
|
||||
from app.database import User, UserResp
|
||||
from app.database.lazer_user import SEARCH_INCLUDED
|
||||
from app.database.score import LegacyScoreResp, ScoreResp
|
||||
from app.dependencies.database import with_db
|
||||
from app.log import logger
|
||||
from app.models.score import GameMode
|
||||
from app.service.asset_proxy_service import get_asset_proxy_service
|
||||
@@ -382,3 +383,14 @@ def get_user_cache_service(redis: Redis) -> UserCacheService:
|
||||
if _user_cache_service is None:
|
||||
_user_cache_service = UserCacheService(redis)
|
||||
return _user_cache_service
|
||||
|
||||
|
||||
async def refresh_user_cache_background(redis: Redis, user_id: int, mode: GameMode):
|
||||
"""后台任务:刷新用户缓存"""
|
||||
try:
|
||||
user_cache_service = get_user_cache_service(redis)
|
||||
# 创建独立的数据库会话
|
||||
async with with_db() as session:
|
||||
await user_cache_service.refresh_user_cache_on_score_submit(session, user_id, mode)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to refresh user cache after score submit: {e}")
|
||||
|
||||
Reference in New Issue
Block a user