Files
g0v0-server/app/calculator.py
MingxuanGame 33f321952d feat(custom-rulesets): support custom rulesets (#23)
* feat(custom_ruleset): add custom rulesets support

* feat(custom-ruleset): add version check

* feat(custom-ruleset): add LegacyIO API to get ruleset hashes

* feat(pp): add check for rulesets whose pp cannot be calculated

* docs(readme): update README to include support for custom rulesets

* fix(custom-ruleset): make `rulesets` empty instead of throw a error when version check is disabled

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* chore(custom-ruleset): apply the latest changes of generator

c891bcd159

and

e25041ad3b

* feat(calculator): add fallback performance calculation for unsupported modes

* fix(calculator): remove debug print

* fix: resolve reviews

* feat(calculator): add difficulty calculation checks

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-26 21:10:36 +08:00

407 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from enum import Enum
import importlib
import math
from typing import TYPE_CHECKING
from app.calculators.performance import PerformanceCalculator
from app.config import settings
from app.log import log
from app.models.score import GameMode
from osupyparser import HitObject, OsuFile
from osupyparser.osu.objects import Slider
from redis.asyncio import Redis
from sqlmodel import col, exists, select
from sqlmodel.ext.asyncio.session import AsyncSession
if TYPE_CHECKING:
from app.database.score import Score
from app.fetcher import Fetcher
logger = log("Calculator")
CALCULATOR: PerformanceCalculator | None = None
async def init_calculator():
global CALCULATOR
try:
module = importlib.import_module(f"app.calculators.performance.{settings.calculator}")
CALCULATOR = module.PerformanceCalculator(**settings.calculator_config)
if CALCULATOR is not None:
await CALCULATOR.init()
except (ImportError, AttributeError) as e:
raise ImportError(f"Failed to import performance calculator for {settings.calculator}") from e
return CALCULATOR
def get_calculator() -> PerformanceCalculator:
if CALCULATOR is None:
raise RuntimeError("Performance calculator is not initialized")
return CALCULATOR
def clamp[T: int | float](n: T, min_value: T, max_value: T) -> T:
if n < min_value:
return min_value
elif n > max_value:
return max_value
else:
return n
def calculate_pp_for_no_calculator(score: "Score", star_rating: float) -> float:
# TODO: Improve this algorithm
# https://www.desmos.com/calculator/i2aa7qm3o6
k = 4.0
pmax = 1.4 * (star_rating**2.8)
b = 0.95 - 0.33 * ((clamp(star_rating, 1, 8) - 1) / 7)
x = score.total_score / 1000000
if x < b:
# Linear section
return pmax * x
else:
# Exponential reward section
x = (x - b) / (1 - b)
exp_part = (math.exp(k * x) - 1) / (math.exp(k) - 1)
return pmax * (b + (1 - b) * exp_part)
async def calculate_pp(score: "Score", beatmap: str, session: AsyncSession) -> float:
from app.database.beatmap import BannedBeatmaps
if settings.suspicious_score_check:
beatmap_banned = (
await session.exec(select(exists()).where(col(BannedBeatmaps.beatmap_id) == score.beatmap_id))
).first()
if beatmap_banned:
return 0
try:
is_suspicious = is_suspicious_beatmap(beatmap)
if is_suspicious:
session.add(BannedBeatmaps(beatmap_id=score.beatmap_id))
logger.warning(f"Beatmap {score.beatmap_id} is suspicious, banned")
return 0
except Exception:
logger.exception(f"Error checking if beatmap {score.beatmap_id} is suspicious")
if not (await get_calculator().can_calculate_performance(score.gamemode)):
if not settings.fallback_no_calculator_pp:
return 0
star_rating = -1
if await get_calculator().can_calculate_difficulty(score.gamemode):
star_rating = (await get_calculator().calculate_difficulty(beatmap, score.mods, score.gamemode)).star_rating
if star_rating < 0:
star_rating = (await score.awaitable_attrs.beatmap).difficulty_rating
pp = calculate_pp_for_no_calculator(score, star_rating)
else:
attrs = await get_calculator().calculate_performance(beatmap, score)
pp = attrs.pp
if settings.suspicious_score_check and (pp > 3000):
logger.warning(
f"User {score.user_id} played {score.beatmap_id} "
f"with {pp=} "
f"acc={score.accuracy}. The score is suspicious and return 0pp"
f"({score.id=})"
)
return 0
return pp
async def pre_fetch_and_calculate_pp(
score: "Score", session: AsyncSession, redis: Redis, fetcher: "Fetcher"
) -> tuple[float, bool]:
"""
优化版PP计算预先获取beatmap文件并使用缓存
"""
from app.database.beatmap import BannedBeatmaps
beatmap_id = score.beatmap_id
# 快速检查是否被封禁
if settings.suspicious_score_check:
beatmap_banned = (
await session.exec(select(exists()).where(col(BannedBeatmaps.beatmap_id) == beatmap_id))
).first()
if beatmap_banned:
return 0, False
# 异步获取beatmap原始文件利用已有的Redis缓存机制
try:
beatmap_raw = await fetcher.get_or_fetch_beatmap_raw(redis, beatmap_id)
except Exception as e:
logger.error(f"Failed to fetch beatmap {beatmap_id}: {e}")
return 0, False
# 在获取文件的同时可以检查可疑beatmap
if settings.suspicious_score_check:
try:
# 将可疑检查也移到线程池中执行
def _check_suspicious():
return is_suspicious_beatmap(beatmap_raw)
loop = asyncio.get_event_loop()
is_sus = await loop.run_in_executor(None, _check_suspicious)
if is_sus:
session.add(BannedBeatmaps(beatmap_id=beatmap_id))
logger.warning(f"Beatmap {beatmap_id} is suspicious, banned")
return 0, True
except Exception:
logger.exception(f"Error checking if beatmap {beatmap_id} is suspicious")
# 调用已优化的PP计算函数
return await calculate_pp(score, beatmap_raw, session), True
# https://osu.ppy.sh/wiki/Gameplay/Score/Total_score
def calculate_level_to_score(n: int) -> float:
if n <= 100:
return 5000 / 3 * (4 * n**3 - 3 * n**2 - n) + 1.25 * 1.8 ** (n - 60)
else:
return 26931190827 + 99999999999 * (n - 100)
# https://github.com/ppy/osu-queue-score-statistics/blob/4bdd479530408de73f3cdd95e097fe126772a65b/osu.Server.Queues.ScoreStatisticsProcessor/Processors/TotalScoreProcessor.cs#L70-L116
def calculate_score_to_level(total_score: int) -> float:
to_next_level = [
30000,
100000,
210000,
360000,
550000,
780000,
1050000,
1360000,
1710000,
2100000,
2530000,
3000000,
3510000,
4060000,
4650000,
5280000,
5950000,
6660000,
7410000,
8200000,
9030000,
9900000,
10810000,
11760000,
12750000,
13780000,
14850000,
15960000,
17110000,
18300000,
19530000,
20800000,
22110000,
23460000,
24850000,
26280000,
27750000,
29260000,
30810000,
32400000,
34030000,
35700000,
37410000,
39160000,
40950000,
42780000,
44650000,
46560000,
48510000,
50500000,
52530000,
54600000,
56710000,
58860000,
61050000,
63280000,
65550000,
67860000,
70210001,
72600001,
75030002,
77500003,
80010006,
82560010,
85150019,
87780034,
90450061,
93160110,
95910198,
98700357,
101530643,
104401157,
107312082,
110263748,
113256747,
116292144,
119371859,
122499346,
125680824,
128927482,
132259468,
135713043,
139353477,
143298259,
147758866,
153115959,
160054726,
169808506,
184597311,
208417160,
248460887,
317675597,
439366075,
655480935,
1041527682,
1733419828,
2975801691,
5209033044,
9225761479,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
99999999999,
]
remaining_score = total_score
level = 0.0
while remaining_score > 0:
next_level_requirement = to_next_level[min(len(to_next_level) - 1, round(level))]
level += min(1, remaining_score / next_level_requirement)
remaining_score -= next_level_requirement
return level + 1
# https://osu.ppy.sh/wiki/Performance_points/Weighting_system
def calculate_pp_weight(index: int) -> float:
return math.pow(0.95, index)
def calculate_weighted_pp(pp: float, index: int) -> float:
return calculate_pp_weight(index) * pp if pp > 0 else 0.0
def calculate_weighted_acc(acc: float, index: int) -> float:
return calculate_pp_weight(index) * acc if acc > 0 else 0.0
# 大致算法来自 https://github.com/MaxOhn/rosu-pp/blob/main/src/model/beatmap/suspicious.rs
class Threshold(int, Enum):
# 谱面异常常量
NOTES_THRESHOLD = 500000 # 除 taiko 以外任何模式的物件数量
TAIKO_THRESHOLD = 30000 # taiko 模式下的物量限制
NOTES_PER_1S_THRESHOLD = 200 # 3000 BPM
NOTES_PER_10S_THRESHOLD = 500 # 600 BPM
# 这个尺寸已经是常规游玩区域大小的 4 倍了 …… 如果不合适那另说吧
NOTE_POSX_THRESHOLD = 512 # x: [-512,512]
NOTE_POSY_THRESHOLD = 384 # y: [-384,384]
POS_ERROR_THRESHOLD = 1280 * 50 # 超过这么多个物件(包括滑条控制点)的位置有问题就毙掉
SLIDER_REPEAT_THRESHOLD = 5000
def too_dense(hit_objects: list[HitObject], per_1s: int, per_10s: int) -> bool:
per_1s = max(1, per_1s)
per_10s = max(1, per_10s)
for i in range(0, len(hit_objects)):
if len(hit_objects) > i + per_1s:
if hit_objects[i + per_1s].start_time - hit_objects[i].start_time < 1000:
return True
elif len(hit_objects) > i + per_10s and hit_objects[i + per_10s].start_time - hit_objects[i].start_time < 10000:
return True
return False
def slider_is_sus(hit_objects: list[HitObject]) -> bool:
for obj in hit_objects:
if isinstance(obj, Slider):
flag_repeat = obj.repeat_count > Threshold.SLIDER_REPEAT_THRESHOLD
flag_pos = int(
obj.pos.x > Threshold.NOTE_POSX_THRESHOLD
or obj.pos.x < 0
or obj.pos.y > Threshold.NOTE_POSY_THRESHOLD
or obj.pos.y < 0
)
for point in obj.points:
flag_pos += int(
point.x > Threshold.NOTE_POSX_THRESHOLD
or point.x < 0
or point.y > Threshold.NOTE_POSY_THRESHOLD
or point.y < 0
)
if flag_pos or flag_repeat:
return True
return False
def is_2b(hit_objects: list[HitObject]) -> bool:
return any(hit_objects[i] == hit_objects[i + 1].start_time for i in range(0, len(hit_objects) - 1))
def is_suspicious_beatmap(content: str) -> bool:
osufile = OsuFile(content=content.encode("utf-8")).parse_file()
if osufile.hit_objects[-1].start_time - osufile.hit_objects[0].start_time > 24 * 60 * 60 * 1000:
return True
if osufile.mode == int(GameMode.TAIKO):
if len(osufile.hit_objects) > Threshold.TAIKO_THRESHOLD:
return True
elif len(osufile.hit_objects) > Threshold.NOTES_THRESHOLD:
return True
match osufile.mode:
case int(GameMode.OSU):
return (
too_dense(
osufile.hit_objects,
Threshold.NOTES_PER_1S_THRESHOLD,
Threshold.NOTES_PER_10S_THRESHOLD,
)
or slider_is_sus(osufile.hit_objects)
or is_2b(osufile.hit_objects)
)
case int(GameMode.TAIKO):
return too_dense(
osufile.hit_objects,
Threshold.NOTES_PER_1S_THRESHOLD * 2,
Threshold.NOTES_PER_10S_THRESHOLD * 2,
) or is_2b(osufile.hit_objects)
case int(GameMode.FRUITS):
return slider_is_sus(osufile.hit_objects) or is_2b(osufile.hit_objects)
case int(GameMode.MANIA):
keys_per_hand = max(1, int(osufile.cs / 2))
per_1s = Threshold.NOTES_PER_1S_THRESHOLD * keys_per_hand
per_10s = Threshold.NOTES_PER_10S_THRESHOLD * keys_per_hand
return too_dense(osufile.hit_objects, per_1s, per_10s)
return False