perf(score): divide score processing into small parts and make them run in background

resolve #47
This commit is contained in:
MingxuanGame
2025-10-02 14:30:57 +00:00
parent bb1c09f4fd
commit f31056ced3
4 changed files with 389 additions and 169 deletions

View File

@@ -16,6 +16,7 @@ from app.calculator import (
from app.config import settings
from app.database.team import TeamMember
from app.dependencies.database import get_redis
from app.log import logger
from app.models.beatmap import BeatmapRankStatus
from app.models.model import (
CurrentUserAttributes,
@@ -36,10 +37,10 @@ from app.storage import StorageService
from app.utils import utcnow
from .beatmap import Beatmap, BeatmapResp
from .beatmap_playcounts import process_beatmap_playcount
from .beatmapset import BeatmapsetResp
from .best_score import BestScore
from .counts import MonthlyPlaycounts
from .events import Event, EventType
from .lazer_user import User, UserResp
from .playlist_best_score import PlaylistBestScore
from .pp_best_score import PPBestScore
@@ -53,7 +54,7 @@ from pydantic import BaseModel, field_serializer, field_validator
from redis.asyncio import Redis
from sqlalchemy import Boolean, Column, DateTime, TextClause
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import Mapped, joinedload
from sqlalchemy.sql.elements import ColumnElement
from sqlmodel import (
JSON,
@@ -841,8 +842,222 @@ def calculate_playtime(score: Score, beatmap_length: int) -> tuple[int, bool]:
)
async def process_user(
async def process_score(
user: User,
beatmap_id: int,
ranked: bool,
score_token: ScoreToken,
info: SoloScoreSubmissionInfo,
session: AsyncSession,
item_id: int | None = None,
room_id: int | None = None,
) -> Score:
gamemode = GameMode.from_int(info.ruleset_id).to_special_mode(info.mods)
logger.info(
"[Score] Creating score for user {user_id} | beatmap={beatmap_id} "
"ruleset={ruleset} passed={passed} total={total}",
user_id=user.id,
beatmap_id=beatmap_id,
ruleset=gamemode,
passed=info.passed,
total=info.total_score,
)
score = Score(
accuracy=info.accuracy,
max_combo=info.max_combo,
mods=info.mods,
passed=info.passed,
rank=info.rank,
total_score=info.total_score,
total_score_without_mods=info.total_score_without_mods,
beatmap_id=beatmap_id,
ended_at=utcnow(),
gamemode=gamemode,
started_at=score_token.created_at,
user_id=user.id,
preserve=info.passed,
map_md5=score_token.beatmap.checksum,
has_replay=False,
type="solo",
n300=info.statistics.get(HitResult.GREAT, 0),
n100=info.statistics.get(HitResult.OK, 0),
n50=info.statistics.get(HitResult.MEH, 0),
nmiss=info.statistics.get(HitResult.MISS, 0),
ngeki=info.statistics.get(HitResult.PERFECT, 0),
nkatu=info.statistics.get(HitResult.GOOD, 0),
nlarge_tick_miss=info.statistics.get(HitResult.LARGE_TICK_MISS, 0),
nsmall_tick_hit=info.statistics.get(HitResult.SMALL_TICK_HIT, 0),
nlarge_tick_hit=info.statistics.get(HitResult.LARGE_TICK_HIT, 0),
nslider_tail_hit=info.statistics.get(HitResult.SLIDER_TAIL_HIT, 0),
playlist_item_id=item_id,
room_id=room_id,
maximum_statistics=info.maximum_statistics,
processed=True,
ranked=ranked,
)
session.add(score)
logger.debug(
"[Score] Score staged for commit | token={token} mods={mods} total_hits={hits}",
token=score_token.id,
mods=info.mods,
hits=sum(info.statistics.values()) if info.statistics else 0,
)
await session.commit()
await session.refresh(score)
return score
async def _process_score_pp(score: Score, session: AsyncSession, redis: Redis, fetcher: "Fetcher"):
if score.pp != 0:
logger.debug(
"[Score] Skipping PP calculation for score {score_id} | already set {pp:.2f}",
score_id=score.id,
pp=score.pp,
)
return
can_get_pp = score.passed and score.ranked and mods_can_get_pp(int(score.gamemode), score.mods)
if not can_get_pp:
logger.debug(
"[Score] Skipping PP calculation for score {score_id} | passed={passed} ranked={ranked} mods={mods}",
score_id=score.id,
passed=score.passed,
ranked=score.ranked,
mods=score.mods,
)
return
pp, successed = await pre_fetch_and_calculate_pp(score, session, redis, fetcher)
if not successed:
await redis.rpush("score:need_recalculate", score.id) # pyright: ignore[reportGeneralTypeIssues]
logger.warning("[Score] Queued score {score_id} for PP recalculation", score_id=score.id)
return
score.pp = pp
logger.info("[Score] Calculated PP for score {score_id} | pp={pp:.2f}", score_id=score.id, pp=pp)
user_id = score.user_id
beatmap_id = score.beatmap_id
previous_pp_best = await get_user_best_pp_in_beatmap(session, beatmap_id, user_id, score.gamemode)
if previous_pp_best is None or score.pp > previous_pp_best.pp:
best_score = PPBestScore(
user_id=user_id,
score_id=score.id,
beatmap_id=beatmap_id,
gamemode=score.gamemode,
pp=score.pp,
acc=score.accuracy,
)
session.add(best_score)
await session.delete(previous_pp_best) if previous_pp_best else None
logger.info(
"[Score] Updated PP best for user {user_id} | score_id={score_id} pp={pp:.2f}",
user_id=user_id,
score_id=score.id,
pp=score.pp,
)
async def _process_score_events(score: Score, session: AsyncSession):
total_users = (await session.exec(select(func.count()).select_from(User))).one()
rank_global = await get_score_position_by_id(
session,
score.beatmap_id,
score.id,
mode=score.gamemode,
user=score.user,
)
if rank_global == 0 or total_users == 0:
logger.debug(
"[Score] Skipping event creation for score {score_id} | "
"rank_global={rank_global} total_users={total_users}",
score_id=score.id,
rank_global=rank_global,
total_users=total_users,
)
return
logger.debug(
"[Score] Processing events for score {score_id} | rank_global={rank_global} total_users={total_users}",
score_id=score.id,
rank_global=rank_global,
total_users=total_users,
)
if rank_global <= min(math.ceil(float(total_users) * 0.01), 50):
rank_event = Event(
created_at=utcnow(),
type=EventType.RANK,
user_id=score.user_id,
user=score.user,
)
rank_event.event_payload = {
"scorerank": score.rank.value,
"rank": rank_global,
"mode": score.gamemode.readable(),
"beatmap": {
"title": (
f"{score.beatmap.beatmapset.artist} - {score.beatmap.beatmapset.title} [{score.beatmap.version}]"
),
"url": score.beatmap.url.replace("https://osu.ppy.sh/", settings.web_url),
},
"user": {
"username": score.user.username,
"url": settings.web_url + "users/" + str(score.user.id),
},
}
session.add(rank_event)
logger.info(
"[Score] Registered rank event for user {user_id} | score_id={score_id} rank={rank}",
user_id=score.user_id,
score_id=score.id,
rank=rank_global,
)
if rank_global == 1:
displaced_score = (
await session.exec(
select(BestScore)
.where(
BestScore.beatmap_id == score.beatmap_id,
BestScore.gamemode == score.gamemode,
)
.order_by(col(BestScore.total_score).desc())
.limit(1)
.offset(1)
)
).first()
if displaced_score and displaced_score.user_id != score.user_id:
username = (await session.exec(select(User.username).where(User.id == displaced_score.user_id))).one()
rank_lost_event = Event(
created_at=utcnow(),
type=EventType.RANK_LOST,
user_id=displaced_score.user_id,
)
rank_lost_event.event_payload = {
"mode": score.gamemode.readable(),
"beatmap": {
"title": (
f"{score.beatmap.beatmapset.artist} - {score.beatmap.beatmapset.title} "
f"[{score.beatmap.version}]"
),
"url": score.beatmap.url.replace("https://osu.ppy.sh/", settings.web_url),
},
"user": {
"username": username,
"url": settings.web_url + "users/" + str(displaced_score.user.id),
},
}
session.add(rank_lost_event)
logger.info(
"[Score] Registered rank lost event | displaced_user={user_id} new_score_id={score_id}",
user_id=displaced_score.user_id,
score_id=score.id,
)
logger.debug(
"[Score] Event processing committed for score {score_id}",
score_id=score.id,
)
async def _process_statistics(
session: AsyncSession,
redis: Redis,
user: User,
score: Score,
score_token: int,
@@ -858,6 +1073,12 @@ async def process_user(
previous_score_best_mod = await get_user_best_score_with_mod_in_beatmap(
session, score.beatmap_id, user.id, mod_for_save, score.gamemode
)
logger.debug(
"[Score] Existing best scores for user {user_id} | global={global_id} mod={mod_id}",
user_id=user.id,
global_id=previous_score_best.score_id if previous_score_best else None,
mod_id=previous_score_best_mod.score_id if previous_score_best_mod else None,
)
add_to_db = False
mouthly_playcount = (
await session.exec(
@@ -882,6 +1103,11 @@ async def process_user(
# pc, pt, tth, tts
statistics.total_score += score.total_score
difference = score.total_score - previous_score_best.total_score if previous_score_best else score.total_score
logger.debug(
"[Score] Score delta computed for {score_id}: {difference}",
score_id=score.id,
difference=difference,
)
if difference > 0 and score.passed and ranked:
match score.rank:
case Rank.X:
@@ -924,18 +1150,35 @@ async def process_user(
mods=mod_for_save,
)
)
logger.info(
"[Score] Created new best score entry for user {user_id} | score_id={score_id} mods={mods}",
user_id=user.id,
score_id=score.id,
mods=mod_for_save,
)
# 情况3: 有最佳分数记录和该mod组合的记录且是同一个记录更新得分更高的情况
elif previous_score_best.score_id == previous_score_best_mod.score_id and difference > 0:
previous_score_best.total_score = score.total_score
previous_score_best.rank = score.rank
previous_score_best.score_id = score.id
logger.info(
"[Score] Updated existing best score for user {user_id} | score_id={score_id} total={total}",
user_id=user.id,
score_id=score.id,
total=score.total_score,
)
# 情况4: 有最佳分数记录和该mod组合的记录但不是同一个记录
elif previous_score_best.score_id != previous_score_best_mod.score_id:
# 更新全局最佳记录(如果新分数更高)
if difference > 0:
# 下方的 if 一定会触发。将高分设置为此分数,删除自己防止重复的 score_id
logger.info(
"[Score] Replacing global best score for user {user_id} | old_score_id={old_score_id}",
user_id=user.id,
old_score_id=previous_score_best.score_id,
)
await session.delete(previous_score_best)
# 更新mod特定最佳记录如果新分数更高
@@ -944,6 +1187,12 @@ async def process_user(
previous_score_best_mod.total_score = score.total_score
previous_score_best_mod.rank = score.rank
previous_score_best_mod.score_id = score.id
logger.info(
"[Score] Replaced mod-specific best for user {user_id} | mods={mods} score_id={score_id}",
user_id=user.id,
mods=mod_for_save,
score_id=score.id,
)
playtime, is_valid = calculate_playtime(score, beatmap_length)
if is_valid:
@@ -952,101 +1201,100 @@ async def process_user(
statistics.play_count += 1
mouthly_playcount.count += 1
statistics.play_time += playtime
with session.no_autoflush:
await process_beatmap_playcount(session, user.id, score.beatmap_id)
logger.debug(
"[Score] Recorded playtime {playtime}s for score {score_id} (user {user_id})",
playtime=playtime,
score_id=score.id,
user_id=user.id,
)
else:
logger.debug(
"[Score] Playtime {playtime}s for score {score_id} did not meet validity checks",
playtime=playtime,
score_id=score.id,
)
nlarge_tick_miss = score.nlarge_tick_miss or 0
nsmall_tick_hit = score.nsmall_tick_hit or 0
nlarge_tick_hit = score.nlarge_tick_hit or 0
statistics.count_100 += score.n100 + score.nkatu
statistics.count_300 += score.n300 + score.ngeki
statistics.count_50 += score.n50
statistics.count_miss += score.nmiss
statistics.total_hits += score.n300 + score.n100 + score.n50 + score.ngeki + score.nkatu
statistics.total_hits += (
score.n300
+ score.n100
+ score.n50
+ score.ngeki
+ score.nkatu
+ nlarge_tick_hit
+ nlarge_tick_miss
+ nsmall_tick_hit
)
if score.gamemode in {GameMode.FRUITS, GameMode.FRUITSRX}:
statistics.count_miss += nlarge_tick_miss
statistics.count_50 += nsmall_tick_hit
statistics.count_100 += nlarge_tick_hit
if score.passed and has_pp:
with session.no_autoflush:
statistics.pp, statistics.hit_accuracy = await calculate_user_pp(
session, statistics.user_id, score.gamemode
)
statistics.pp, statistics.hit_accuracy = await calculate_user_pp(session, statistics.user_id, score.gamemode)
if add_to_db:
session.add(mouthly_playcount)
await session.commit()
await session.refresh(user)
logger.debug(
"[Score] Created monthly playcount record for user {user_id} ({year}-{month})",
user_id=user.id,
year=mouthly_playcount.year,
month=mouthly_playcount.month,
)
async def process_score(
user: User,
beatmap_id: int,
ranked: bool,
score_token: ScoreToken,
info: SoloScoreSubmissionInfo,
fetcher: "Fetcher",
async def process_user(
session: AsyncSession,
redis: Redis,
item_id: int | None = None,
room_id: int | None = None,
) -> Score:
can_get_pp = info.passed and ranked and mods_can_get_pp(info.ruleset_id, info.mods)
gamemode = GameMode.from_int(info.ruleset_id).to_special_mode(info.mods)
score = Score(
accuracy=info.accuracy,
max_combo=info.max_combo,
mods=info.mods,
passed=info.passed,
rank=info.rank,
total_score=info.total_score,
total_score_without_mods=info.total_score_without_mods,
beatmap_id=beatmap_id,
ended_at=utcnow(),
gamemode=gamemode,
started_at=score_token.created_at,
user_id=user.id,
preserve=info.passed,
map_md5=score_token.beatmap.checksum,
has_replay=False,
type="solo",
n300=info.statistics.get(HitResult.GREAT, 0),
n100=info.statistics.get(HitResult.OK, 0),
n50=info.statistics.get(HitResult.MEH, 0),
nmiss=info.statistics.get(HitResult.MISS, 0),
ngeki=info.statistics.get(HitResult.PERFECT, 0),
nkatu=info.statistics.get(HitResult.GOOD, 0),
nlarge_tick_miss=info.statistics.get(HitResult.LARGE_TICK_MISS, 0),
nsmall_tick_hit=info.statistics.get(HitResult.SMALL_TICK_HIT, 0),
nlarge_tick_hit=info.statistics.get(HitResult.LARGE_TICK_HIT, 0),
nslider_tail_hit=info.statistics.get(HitResult.SLIDER_TAIL_HIT, 0),
playlist_item_id=item_id,
room_id=room_id,
maximum_statistics=info.maximum_statistics,
processed=True,
ranked=ranked,
)
successed = True
if can_get_pp:
pp, successed = await pre_fetch_and_calculate_pp(score, session, redis, fetcher)
score.pp = pp
session.add(score)
fetcher: "Fetcher",
user: User,
score: Score,
score_token: int,
beatmap_length: int,
beatmap_status: BeatmapRankStatus,
):
score_id = score.id
user_id = user.id
logger.info(
"[Score] Processing score {score_id} for user {user_id} on beatmap {beatmap_id}",
score_id=score_id,
user_id=user_id,
beatmap_id=score.beatmap_id,
)
await _process_score_pp(score, session, redis, fetcher)
await session.commit()
await session.refresh(score)
if not successed:
await redis.rpush("score:need_recalculate", score.id) # pyright: ignore[reportGeneralTypeIssues]
if can_get_pp and score.pp != 0:
previous_pp_best = await get_user_best_pp_in_beatmap(session, beatmap_id, user_id, score.gamemode)
if previous_pp_best is None or score.pp > previous_pp_best.pp:
best_score = PPBestScore(
user_id=user_id,
score_id=score.id,
beatmap_id=beatmap_id,
gamemode=score.gamemode,
pp=score.pp,
acc=score.accuracy,
)
session.add(best_score)
await session.delete(previous_pp_best) if previous_pp_best else None
await session.commit()
await session.refresh(score)
await session.refresh(score_token)
await session.refresh(user)
await redis.publish("osu-channel:score:processed", f'{{"ScoreId": {score.id}}}')
return score
await _process_statistics(
session,
redis,
user,
score,
score_token,
beatmap_length,
beatmap_status,
)
await redis.publish("osu-channel:score:processed", f'{{"ScoreId": {score_id}}}')
await session.commit()
score_ = (await session.exec(select(Score).where(Score.id == score_id).options(joinedload(Score.beatmap)))).first()
if score_ is None:
logger.warning(
"[Score] Score {score_id} disappeared after commit, skipping event processing",
score_id=score_id,
)
return
await _process_score_events(score_, session)
await session.commit()
logger.info(
"[Score] Finished processing score {score_id} for user {user_id}",
score_id=score_id,
user_id=user_id,
)

View File

@@ -36,6 +36,7 @@ class ScoreToken(ScoreTokenBase, table=True):
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id")))
beatmap_id: int = Field(foreign_key="beatmaps.id")
user: Mapped[User] = Relationship()
beatmap: Mapped[Beatmap] = Relationship()

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager
from contextvars import ContextVar
from datetime import datetime
import json
@@ -64,8 +65,13 @@ async def get_db():
yield session
def with_db():
return AsyncSession(engine)
@asynccontextmanager
async def with_db():
async with AsyncSession(engine) as session:
try:
yield session
finally:
await session.close()
DBFactory = Callable[[], AsyncIterator[AsyncSession]]

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from datetime import UTC, date
import math
import time
from app.calculator import clamp
@@ -17,10 +16,8 @@ from app.database import (
User,
)
from app.database.achievement import process_achievements
from app.database.best_score import BestScore
from app.database.counts import ReplayWatchedCount
from app.database.daily_challenge import process_daily_challenge_score
from app.database.events import Event, EventType
from app.database.playlist_attempts import ItemAttemptsCount
from app.database.playlist_best_score import (
PlaylistBestScore,
@@ -37,12 +34,13 @@ from app.database.score import (
process_user,
)
from app.dependencies.api_version import APIVersion
from app.dependencies.database import Database, get_redis
from app.dependencies.database import Database, get_redis, with_db
from app.dependencies.fetcher import get_fetcher
from app.dependencies.storage import get_storage_service
from app.dependencies.user import get_client_user, get_current_user
from app.fetcher import Fetcher
from app.log import logger
from app.models.beatmap import BeatmapRankStatus
from app.models.room import RoomCategory
from app.models.score import (
GameMode,
@@ -50,6 +48,7 @@ from app.models.score import (
Rank,
SoloScoreSubmissionInfo,
)
from app.service.beatmap_cache_service import get_beatmap_cache_service
from app.service.user_cache_service import refresh_user_cache_background
from app.storage.base import StorageService
from app.utils import utcnow
@@ -78,16 +77,47 @@ from sqlmodel.ext.asyncio.session import AsyncSession
READ_SCORE_TIMEOUT = 10
async def process_user_achievement(score_id: int):
from app.dependencies.database import engine
from sqlmodel.ext.asyncio.session import AsyncSession
session = AsyncSession(engine)
try:
async def _process_user_achievement(score_id: int):
async with with_db() as session:
await process_achievements(session, get_redis(), score_id)
finally:
await session.close()
async def _process_user(score_id: int, user_id: int, redis: Redis, fetcher: Fetcher):
async with with_db() as session:
user = await session.get(User, user_id)
if not user:
logger.warning(
"User {user_id} not found when processing score {score_id}", user_id=user_id, score_id=score_id
)
return
score = await session.get(Score, score_id)
if not score:
logger.warning(
"Score {score_id} not found when processing user {user_id}", score_id=score_id, user_id=user_id
)
return
score_token = (await session.exec(select(ScoreToken.id).where(ScoreToken.score_id == score_id))).first()
if not score_token:
logger.warning(
"ScoreToken for score {score_id} not found when processing user {user_id}",
score_id=score_id,
user_id=user_id,
)
return
beatmap = (
await session.exec(
select(Beatmap.total_length, Beatmap.beatmap_status).where(Beatmap.id == score.beatmap_id)
)
).first()
if not beatmap:
logger.warning(
"Beatmap {beatmap_id} not found when processing user {user_id} for score {score_id}",
beatmap_id=score.beatmap_id,
user_id=user_id,
score_id=score_id,
)
return
await process_user(session, redis, fetcher, user, score, score_token, beatmap[0], BeatmapRankStatus(beatmap[1]))
async def submit_score(
@@ -124,10 +154,7 @@ async def submit_score(
if not score:
raise HTTPException(status_code=404, detail="Score not found")
else:
# 智能预取beatmap缓存异步进行不阻塞主流程
try:
from app.service.beatmap_cache_service import get_beatmap_cache_service
cache_service = get_beatmap_cache_service(redis, fetcher)
await cache_service.smart_preload_for_score(beatmap)
except Exception as e:
@@ -138,92 +165,30 @@ async def submit_score(
except HTTPError:
raise HTTPException(status_code=404, detail="Beatmap not found")
status = db_beatmap.beatmap_status
beatmap_length = db_beatmap.total_length
score = await process_score(
current_user,
beatmap,
status.has_pp() or settings.enable_all_beatmap_pp,
score_token,
info,
fetcher,
db,
redis,
item_id,
room_id,
)
await db.refresh(current_user)
await db.refresh(score_token)
score_id = score.id
score_token.score_id = score_id
await process_user(db, current_user, score, token, beatmap_length, status)
score = (await db.exec(select(Score).options(joinedload(Score.user)).where(Score.id == score_id))).one()
await db.commit()
await db.refresh(score)
background_task.add_task(_process_user, score_id, user_id, redis, fetcher)
resp: ScoreResp = await ScoreResp.from_db(db, score)
score_gamemode = score.gamemode
total_users = (await db.exec(select(func.count()).select_from(User))).one()
if resp.rank_global is not None and resp.rank_global <= min(math.ceil(float(total_users) * 0.01), 50):
rank_event = Event(
created_at=utcnow(),
type=EventType.RANK,
user_id=score.user_id,
user=score.user,
)
rank_event.event_payload = {
"scorerank": score.rank.value,
"rank": resp.rank_global,
"mode": score.gamemode.readable(),
"beatmap": {
"title": (
f"{score.beatmap.beatmapset.artist} - {score.beatmap.beatmapset.title} [{score.beatmap.version}]"
),
"url": score.beatmap.url.replace("https://osu.ppy.sh/", settings.web_url),
},
"user": {
"username": score.user.username,
"url": settings.web_url + "users/" + str(score.user.id),
},
}
db.add(rank_event)
if resp.rank_global is not None and resp.rank_global == 1:
displaced_score = (
await db.exec(
select(BestScore)
.where(
BestScore.beatmap_id == score.beatmap_id,
BestScore.gamemode == score.gamemode,
)
.order_by(col(BestScore.total_score).desc())
.limit(1)
.offset(1)
)
).first()
if displaced_score and displaced_score.user_id != resp.user_id:
username = (await db.exec(select(User.username).where(User.id == displaced_score.user_id))).one()
rank_lost_event = Event(
created_at=utcnow(),
type=EventType.RANK_LOST,
user_id=displaced_score.user_id,
)
rank_lost_event.event_payload = {
"mode": score.gamemode.readable(),
"beatmap": {
"title": (
f"{score.beatmap.beatmapset.artist} - {score.beatmap.beatmapset.title} "
f"[{score.beatmap.version}]"
),
"url": score.beatmap.url.replace("https://osu.ppy.sh/", settings.web_url),
},
"user": {
"username": username,
"url": settings.web_url + "users/" + str(displaced_score.user.id),
},
}
db.add(rank_lost_event)
await db.commit()
if user_id is not None:
background_task.add_task(refresh_user_cache_background, redis, user_id, score_gamemode)
background_task.add_task(process_user_achievement, resp.id)
background_task.add_task(_process_user_achievement, resp.id)
return resp