chore(linter): make linter happy

This commit is contained in:
MingxuanGame
2025-09-30 07:57:08 +00:00
parent 0f637446df
commit 017b058e63
15 changed files with 99 additions and 120 deletions

View File

@@ -6,20 +6,21 @@
from __future__ import annotations
import asyncio
from typing import Awaitable, Callable, Optional
from collections.abc import Awaitable, Callable
from typing import ClassVar, Literal, cast
from app.database.lazer_user import User
from app.database.verification import LoginSession
from app.dependencies.database import get_redis, with_db
from app.log import logger
from app.service.verification_service import LoginSessionService
from app.utils import bg_tasks
from fastapi import HTTPException, Request, Response, status
from fastapi.responses import JSONResponse
from redis.asyncio import Redis
from sqlmodel.ext.asyncio.session import AsyncSession
from app.database.lazer_user import User
from app.database.verification import LoginSession
from app.dependencies.database import with_db, get_redis
from app.log import logger
from app.service.verification_service import LoginSessionService
class SessionVerificationState:
"""会话验证状态管理类
@@ -39,7 +40,7 @@ class SessionVerificationState:
db: AsyncSession,
redis: Redis,
user: User,
) -> Optional[SessionVerificationState]:
) -> SessionVerificationState | None:
"""获取当前会话验证状态"""
try:
# 从请求头或token中获取会话信息
@@ -58,7 +59,7 @@ class SessionVerificationState:
return None
@staticmethod
def _extract_session_token(request: Request) -> Optional[str]:
def _extract_session_token(request: Request) -> str | None:
"""从请求中提取会话token"""
# 尝试从Authorization header提取
auth_header = request.headers.get("Authorization", "")
@@ -79,11 +80,11 @@ class SessionVerificationState:
# 智能选择验证方法
# 参考osu-web: API版本 < 20250913 或用户没有TOTP时使用邮件验证
# 这里简化为检查用户是否有TOTP
totp_key = getattr(self.user, 'totp_key', None)
current_method = 'totp' if totp_key else 'mail'
totp_key = getattr(self.user, "totp_key", None)
current_method = "totp" if totp_key else "mail"
# 设置验证方法
asyncio.create_task(self._set_verification_method(current_method))
bg_tasks.add_task(self._set_verification_method, current_method)
return current_method
@@ -91,11 +92,14 @@ class SessionVerificationState:
"""内部方法:设置验证方法"""
try:
token_id = self.session.token_id
if token_id is not None and method in ['totp', 'mail']:
if token_id is not None and method in ["totp", "mail"]:
# 类型检查确保method是正确的字面量类型
verification_method = method if method in ['totp', 'mail'] else 'totp'
verification_method = method if method in ["totp", "mail"] else "totp"
await LoginSessionService.set_login_method(
self.user.id, token_id, verification_method, self.redis # type: ignore
self.user.id,
token_id,
cast(Literal["totp", "mail"], verification_method),
self.redis,
)
except Exception as e:
logger.error(f"[Session Verification] Error setting verification method: {e}")
@@ -112,9 +116,7 @@ class SessionVerificationState:
try:
token_id = self.session.token_id
if token_id is not None:
await LoginSessionService.mark_session_verified(
db, self.redis, self.user.id, token_id
)
await LoginSessionService.mark_session_verified(db, self.redis, self.user.id, token_id)
finally:
await db.close()
except Exception as e:
@@ -142,8 +144,7 @@ class SessionVerificationState:
db = with_db()
try:
await EmailVerificationService.send_verification_email(
db, self.redis, self.user.id, self.user.username,
self.user.email, None, None
db, self.redis, self.user.id, self.user.username, self.user.email, None, None
)
finally:
await db.close()
@@ -158,7 +159,7 @@ class SessionVerificationController:
"""
# 需要跳过验证的路由参考osu-web的SKIP_VERIFICATION_ROUTES
SKIP_VERIFICATION_ROUTES = {
SKIP_VERIFICATION_ROUTES: ClassVar[set[str]] = {
"/api/v2/session/verify",
"/api/v2/session/verify/reissue",
"/api/v2/me",
@@ -190,26 +191,18 @@ class SessionVerificationController:
# API请求返回JSON响应
if request.url.path.startswith("/api/"):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"method": method}
)
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"method": method})
# 其他情况可以扩展支持HTML响应
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={
"authentication": "verify",
"method": method,
"message": "Session verification required"
}
content={"authentication": "verify", "method": method, "message": "Session verification required"},
)
except Exception as e:
logger.error(f"[Session Verification] Error initiating verification: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Verification initiation failed"
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Verification initiation failed"
)
@@ -262,7 +255,7 @@ class SessionVerificationMiddleware:
# 出错时允许请求继续,避免阻塞正常流程
return await call_next(request)
async def _get_user(self, request: Request) -> Optional[User]:
async def _get_user(self, request: Request) -> User | None:
"""获取当前用户"""
try:
# 这里需要手动获取用户,因为在中间件中无法直接使用依赖注入