Files
g0v0-server/app/router/v2/session_verify.py
2025-08-22 08:21:52 +00:00

181 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
会话验证路由 - 实现类似 osu! 的邮件验证流程 (API v2)
"""
from __future__ import annotations
from typing import Annotated
from app.database import User
from app.dependencies import get_current_user
from app.dependencies.database import Database, get_redis
from app.dependencies.geoip import GeoIPHelper, get_geoip_helper
from app.service.email_verification_service import (
EmailVerificationService,
LoginSessionService,
)
from app.service.login_log_service import LoginLogService
from .router import router
from fastapi import Depends, Form, HTTPException, Request, Security, status
from fastapi.responses import Response
from pydantic import BaseModel
from redis.asyncio import Redis
class SessionReissueResponse(BaseModel):
"""重新发送验证码响应"""
success: bool
message: str
@router.post(
"/session/verify",
name="验证会话",
description="验证邮件验证码并完成会话认证",
status_code=204,
)
async def verify_session(
request: Request,
db: Database,
redis: Annotated[Redis, Depends(get_redis)],
verification_key: str = Form(..., description="8位邮件验证码"),
current_user: User = Security(get_current_user),
) -> Response:
"""
验证邮件验证码并完成会话认证
对应 osu! 的 session/verify 接口
成功时返回 204 No Content失败时返回 401 Unauthorized
"""
try:
from app.dependencies.geoip import get_client_ip
ip_address = get_client_ip(request) # noqa: F841
user_agent = request.headers.get("User-Agent", "Unknown") # noqa: F841
# 从当前认证用户获取信息
user_id = current_user.id
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户未认证")
# 验证邮件验证码
success, message = await EmailVerificationService.verify_code(db, redis, user_id, verification_key)
if success:
# 记录成功的邮件验证
await LoginLogService.record_login(
db=db,
user_id=user_id,
request=request,
login_method="email_verification",
login_success=True,
notes="邮件验证成功",
)
# 返回 204 No Content 表示验证成功
return Response(status_code=status.HTTP_204_NO_CONTENT)
else:
# 记录失败的邮件验证尝试
await LoginLogService.record_failed_login(
db=db,
request=request,
attempted_username=current_user.username,
login_method="email_verification",
notes=f"邮件验证失败: {message}",
)
# 返回 401 Unauthorized 表示验证失败
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=message)
except ValueError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的用户会话")
except Exception:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="验证过程中发生错误")
@router.post(
"/session/verify/reissue",
name="重新发送验证码",
description="重新发送邮件验证码",
response_model=SessionReissueResponse,
)
async def reissue_verification_code(
request: Request,
db: Database,
redis: Annotated[Redis, Depends(get_redis)],
current_user: User = Security(get_current_user),
) -> SessionReissueResponse:
"""
重新发送邮件验证码
对应 osu! 的 session/verify/reissue 接口
"""
try:
from app.dependencies.geoip import get_client_ip
ip_address = get_client_ip(request)
user_agent = request.headers.get("User-Agent", "Unknown")
# 从当前认证用户获取信息
user_id = current_user.id
if not user_id:
return SessionReissueResponse(success=False, message="用户未认证")
# 重新发送验证码
success, message = await EmailVerificationService.resend_verification_code(
db,
redis,
user_id,
current_user.username,
current_user.email,
ip_address,
user_agent,
)
return SessionReissueResponse(success=success, message=message)
except ValueError:
return SessionReissueResponse(success=False, message="无效的用户会话")
except Exception:
return SessionReissueResponse(success=False, message="重新发送过程中发生错误")
@router.post(
"/session/check-new-location",
name="检查新位置登录",
description="检查登录是否来自新位置(内部接口)",
)
async def check_new_location(
request: Request,
db: Database,
user_id: int,
geoip: GeoIPHelper = Depends(get_geoip_helper),
):
"""
检查是否为新位置登录
这是一个内部接口,用于登录流程中判断是否需要邮件验证
"""
try:
from app.dependencies.geoip import get_client_ip
ip_address = get_client_ip(request)
geo_info = geoip.lookup(ip_address)
country_code = geo_info.get("country_iso", "XX")
is_new_location = await LoginSessionService.check_new_location(db, user_id, ip_address, country_code)
return {
"is_new_location": is_new_location,
"ip_address": ip_address,
"country_code": country_code,
}
except Exception as e:
return {
"is_new_location": True, # 出错时默认为新位置
"error": str(e),
}