添加防止重放攻击

This commit is contained in:
咕谷酱
2025-09-24 00:35:57 +08:00
parent 953f33be4f
commit 8054281b15
3 changed files with 73 additions and 9 deletions

View File

@@ -6,7 +6,7 @@ from __future__ import annotations
from typing import Annotated, Literal
from app.auth import check_totp_backup_code, verify_totp_key
from app.auth import check_totp_backup_code, verify_totp_key_with_replay_protection
from app.config import settings
from app.const import BACKUP_CODE_LENGTH
from app.database.auth import TotpKeys
@@ -40,7 +40,11 @@ class SessionReissueResponse(BaseModel):
message: str
class VerifyFailed(Exception): ...
class VerifyFailed(Exception):
def __init__(self, message: str, reason: str | None = None, should_reissue: bool = False):
super().__init__(message)
self.reason = reason
self.should_reissue = should_reissue
@router.post(
@@ -80,28 +84,41 @@ async def verify_session(
try:
totp_key: TotpKeys | None = await current_user.awaitable_attrs.totp_key
if verify_method is None:
verify_method = "totp" if totp_key else "mail"
# 智能选择验证方法参考osu-web实现
# API版本较老或用户未设置TOTP时强制使用邮件验证
if api_version < 20240101 or totp_key is None:
verify_method = "mail"
else:
verify_method = "totp"
await LoginSessionService.set_login_method(user_id, token_id, verify_method, redis)
login_method = verify_method
if verify_method == "totp":
if not totp_key:
# TOTP密钥在验证开始和现在之间被删除参考osu-web的fallback机制
if settings.enable_email_verification:
await LoginSessionService.set_login_method(user_id, token_id, "mail", redis)
await EmailVerificationService.send_verification_email(
db, redis, user_id, current_user.username, current_user.email, ip_address, user_agent
)
verify_method = "mail"
raise VerifyFailed("用户未设置 TOTP已发送邮件验证")
raise VerifyFailed("用户TOTP已被删除已切换到邮件验证")
# 如果未开启邮箱验证,则直接认为认证通过
# 正常不会进入到这里
elif verify_totp_key(totp_key.secret, verification_key):
elif await verify_totp_key_with_replay_protection(user_id, totp_key.secret, verification_key, redis):
pass
elif len(verification_key) == BACKUP_CODE_LENGTH and check_totp_backup_code(totp_key, verification_key):
login_method = "totp_backup_code"
else:
raise VerifyFailed("TOTP 验证失败")
# 记录详细的验证失败原因参考osu-web的错误处理
if len(verification_key) != 6:
raise VerifyFailed("TOTP验证码长度错误应为6位数字", reason="incorrect_length")
elif not verification_key.isdigit():
raise VerifyFailed("TOTP验证码格式错误应为纯数字", reason="incorrect_format")
else:
# 可能是密钥错误或者重放攻击
raise VerifyFailed("TOTP 验证失败,请检查验证码是否正确且未过期", reason="incorrect_key")
else:
success, message = await EmailVerificationService.verify_email_code(db, redis, user_id, verification_key)
if not success:
@@ -127,7 +144,28 @@ async def verify_session(
login_method=login_method,
notes=str(e),
)
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"method": verify_method})
# 构建更详细的错误响应参考osu-web的错误处理
error_response = {
"method": verify_method,
"error": str(e),
}
# 如果有具体的错误原因,添加到响应中
if hasattr(e, 'reason') and e.reason:
error_response["reason"] = e.reason
# 如果需要重新发送邮件验证码
if hasattr(e, 'should_reissue') and e.should_reissue and verify_method == "mail":
try:
await EmailVerificationService.send_verification_email(
db, redis, user_id, current_user.username, current_user.email, ip_address, user_agent
)
error_response["reissued"] = True
except Exception:
pass # 忽略重发邮件失败的错误
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content=error_response)
@router.post(