refactor(database): use asyncio
This commit is contained in:
140
test_lazer.py
140
test_lazer.py
@@ -12,108 +12,106 @@ import sys
|
||||
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
from app.database import User
|
||||
from app.dependencies.database import get_db
|
||||
from app.dependencies.database import engine
|
||||
from app.utils import convert_db_user_to_api_user
|
||||
|
||||
from sqlmodel import select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
|
||||
def test_lazer_tables():
|
||||
async def test_lazer_tables():
|
||||
"""测试 lazer 表的基本功能"""
|
||||
print("测试 Lazer API 表支持...")
|
||||
|
||||
# 获取数据库会话
|
||||
db_gen = get_db()
|
||||
db = next(db_gen)
|
||||
async with AsyncSession(engine) as session:
|
||||
async with session.begin():
|
||||
try:
|
||||
# 测试查询用户
|
||||
statement = select(User)
|
||||
result = await session.execute(statement)
|
||||
user = result.scalars().first()
|
||||
if not user:
|
||||
print("❌ 没有找到用户,请先同步数据")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 测试查询用户
|
||||
statement = select(User)
|
||||
user = db.exec(statement).first()
|
||||
if not user:
|
||||
print("❌ 没有找到用户,请先同步数据")
|
||||
return False
|
||||
print(f"✓ 找到用户: {user.name} (ID: {user.id})")
|
||||
|
||||
print(f"✓ 找到用户: {user.name} (ID: {user.id})")
|
||||
# 测试 lazer 资料
|
||||
if user.lazer_profile:
|
||||
print(
|
||||
f"✓ 用户有 lazer 资料: 支持者={user.lazer_profile.is_supporter}"
|
||||
)
|
||||
else:
|
||||
print("⚠ 用户没有 lazer 资料,将使用默认值")
|
||||
|
||||
# 测试 lazer 资料
|
||||
if user.lazer_profile:
|
||||
print(f"✓ 用户有 lazer 资料: 支持者={user.lazer_profile.is_supporter}")
|
||||
else:
|
||||
print("⚠ 用户没有 lazer 资料,将使用默认值")
|
||||
# 测试 lazer 统计
|
||||
osu_stats = None
|
||||
for stat in user.lazer_statistics:
|
||||
if stat.mode == "osu":
|
||||
osu_stats = stat
|
||||
break
|
||||
|
||||
# 测试 lazer 统计
|
||||
osu_stats = None
|
||||
for stat in user.lazer_statistics:
|
||||
if stat.mode == "osu":
|
||||
osu_stats = stat
|
||||
break
|
||||
if osu_stats:
|
||||
print(
|
||||
f"✓ 用户有 osu! 统计: PP={osu_stats.pp}, "
|
||||
f"游戏次数={osu_stats.play_count}"
|
||||
)
|
||||
else:
|
||||
print("⚠ 用户没有 osu! 统计,将使用默认值")
|
||||
|
||||
if osu_stats:
|
||||
print(
|
||||
f"✓ 用户有 osu! 统计: PP={osu_stats.pp}, "
|
||||
f"游戏次数={osu_stats.play_count}"
|
||||
)
|
||||
else:
|
||||
print("⚠ 用户没有 osu! 统计,将使用默认值")
|
||||
# 测试转换为 API 格式
|
||||
api_user = convert_db_user_to_api_user(user, "osu")
|
||||
print("✓ 成功转换为 API 用户格式")
|
||||
print(f" - 用户名: {api_user.username}")
|
||||
print(f" - 国家: {api_user.country_code}")
|
||||
print(f" - PP: {api_user.statistics.pp}")
|
||||
print(f" - 是否支持者: {api_user.is_supporter}")
|
||||
|
||||
# 测试转换为 API 格式
|
||||
api_user = convert_db_user_to_api_user(user, "osu", db)
|
||||
print("✓ 成功转换为 API 用户格式")
|
||||
print(f" - 用户名: {api_user.username}")
|
||||
print(f" - 国家: {api_user.country_code}")
|
||||
print(f" - PP: {api_user.statistics.pp}")
|
||||
print(f" - 是否支持者: {api_user.is_supporter}")
|
||||
return True
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ 测试失败: {e}")
|
||||
import traceback
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 测试失败: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
return False
|
||||
finally:
|
||||
db.close()
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
|
||||
def test_authentication():
|
||||
async def test_authentication():
|
||||
"""测试认证功能"""
|
||||
print("\n测试认证功能...")
|
||||
|
||||
db_gen = get_db()
|
||||
db = next(db_gen)
|
||||
async with AsyncSession(engine) as session:
|
||||
async with session.begin():
|
||||
try:
|
||||
# 尝试认证第一个用户
|
||||
statement = select(User)
|
||||
result = await session.execute(statement)
|
||||
user = result.scalars().first()
|
||||
if not user:
|
||||
print("❌ 没有用户进行认证测试")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 尝试认证第一个用户
|
||||
statement = select(User)
|
||||
user = db.exec(statement).first()
|
||||
if not user:
|
||||
print("❌ 没有用户进行认证测试")
|
||||
return False
|
||||
print(f"✓ 测试用户: {user.name}")
|
||||
print("⚠ 注意: 实际密码认证需要正确的密码")
|
||||
|
||||
print(f"✓ 测试用户: {user.name}")
|
||||
print("⚠ 注意: 实际密码认证需要正确的密码")
|
||||
return True
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 认证测试失败: {e}")
|
||||
return False
|
||||
finally:
|
||||
db.close()
|
||||
except Exception as e:
|
||||
print(f"❌ 认证测试失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
async def main():
|
||||
"""主测试函数"""
|
||||
print("Lazer API 系统测试")
|
||||
print("=" * 40)
|
||||
|
||||
# 测试表连接
|
||||
success1 = test_lazer_tables()
|
||||
success1 = await test_lazer_tables()
|
||||
|
||||
# 测试认证
|
||||
success2 = test_authentication()
|
||||
success2 = await test_authentication()
|
||||
|
||||
print("\n" + "=" * 40)
|
||||
if success1 and success2:
|
||||
@@ -130,4 +128,6 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
import asyncio
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
Reference in New Issue
Block a user