fix(database): fix score database

This commit is contained in:
MingxuanGame
2025-07-26 12:05:54 +08:00
parent 7ea4570c17
commit 585cb9d98a
14 changed files with 90 additions and 107 deletions

View File

@@ -7,17 +7,18 @@ from __future__ import annotations
import asyncio
from datetime import datetime
import random
from app.auth import get_password_hash
from app.database import (
User,
)
from app.database.beatmapset import Beatmapset, BeatmapsetResp
from app.database.beatmap import Beatmap, BeatmapResp
from app.database.beatmap import Beatmap
from app.database.beatmapset import Beatmapset
from app.database.score import Score
from app.models.score import GameMode, Rank, APIMod
from app.models.beatmap import BeatmapRankStatus, Genre, Language
from app.dependencies.database import create_tables, engine
from app.models.beatmap import BeatmapRankStatus, Genre, Language
from app.models.score import APIMod, GameMode, Rank
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -29,8 +30,8 @@ async def create_sample_user():
async with session.begin():
# 检查用户是否已存在
statement = select(User).where(User.name == "Googujiang")
result = await session.execute(statement)
existing_user = result.scalars().first()
result = await session.exec(statement)
existing_user = result.first()
if existing_user:
print("示例用户已存在,跳过创建")
return existing_user
@@ -63,13 +64,6 @@ async def create_sample_user():
)
session.add(user)
await session.commit()
await session.refresh(user)
# 确保用户ID存在
if user.id is None:
raise ValueError("User ID is None after saving to database")
print(f"成功创建示例用户: {user.name} (ID: {user.id})")
print(f"安全用户名: {user.safe_name}")
print(f"邮箱: {user.email}")
@@ -77,14 +71,15 @@ async def create_sample_user():
return user
async def create_sample_beatmap_data(user: User):
async def create_sample_beatmap_data():
"""创建示例谱面数据"""
async with AsyncSession(engine) as session:
async with session.begin():
user_id = random.randint(1, 1000)
# 检查谱面集是否已存在
statement = select(Beatmapset).where(Beatmapset.id == 1)
result = await session.execute(statement)
existing_beatmapset = result.scalars().first()
result = await session.exec(statement)
existing_beatmapset = result.first()
if existing_beatmapset:
print("示例谱面集已存在,跳过创建")
return existing_beatmapset
@@ -106,7 +101,7 @@ async def create_sample_beatmap_data(user: User):
spotlight=False,
title="Example Song",
title_unicode="Example Song",
user_id=user.id,
user_id=user_id,
video=False,
availability_info=None,
download_disabled=False,
@@ -127,7 +122,6 @@ async def create_sample_beatmap_data(user: User):
ratings=[],
)
session.add(beatmapset)
await session.flush()
# 创建谱面
beatmap = Beatmap(
@@ -138,7 +132,7 @@ async def create_sample_beatmap_data(user: User):
difficulty_rating=5.5,
beatmap_status=BeatmapRankStatus.RANKED,
total_length=195,
user_id=user.id,
user_id=user_id,
version="Example Difficulty",
checksum="example_checksum",
current_user_playcount=0,
@@ -158,33 +152,35 @@ async def create_sample_beatmap_data(user: User):
playcount=50,
)
session.add(beatmap)
await session.flush()
# 创建成绩
score = Score(
id=1,
accuracy=0.9876,
map_md5="example_checksum",
user_id=1,
best_id=1,
build_id=None,
classic_total_score=1234567,
ended_at=datetime.now(),
has_replay=True,
max_combo=1100,
mods=[APIMod(acronym="HD"), APIMod(acronym="DT")],
mods=[
APIMod(acronym="HD", settings={}),
APIMod(acronym="DT", settings={}),
],
passed=True,
playlist_item_id=None,
pp=250.5,
preserve=True,
rank=Rank.S,
room_id=None,
ruleset_id=GameMode.OSU,
gamemode=GameMode.OSU,
started_at=datetime.now(),
total_score=1234567,
type="solo_score",
position=None,
beatmap_id=1,
user_id=user.id,
n300=950,
n100=30,
n50=20,
@@ -195,8 +191,6 @@ async def create_sample_beatmap_data(user: User):
nslider_tail_hit=None,
)
session.add(score)
await session.commit()
await session.refresh(beatmapset)
print(f"成功创建示例谱面集: {beatmapset.title} (ID: {beatmapset.id})")
print(f"成功创建示例谱面: {beatmap.version} (ID: {beatmap.id})")
@@ -207,13 +201,14 @@ async def create_sample_beatmap_data(user: User):
async def main():
print("开始创建示例数据...")
await create_tables()
user = await create_sample_user()
await create_sample_beatmap_data(user)
await create_sample_user()
await create_sample_beatmap_data()
print("示例数据创建完成!")
print(f"用户名: {user.name}")
print("密码: password123")
print("现在您可以使用这些凭据来测试API了。")
# print(f"用户名: {user.name}")
# print("密码: password123")
# print("现在您可以使用这些凭据来测试API了。")
await engine.dispose()
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())