147 lines
5.0 KiB
Python
147 lines
5.0 KiB
Python
from __future__ import annotations
|
||
|
||
from app.auth import (
|
||
check_totp_backup_code,
|
||
finish_create_totp_key,
|
||
start_create_totp_key,
|
||
totp_redis_key,
|
||
verify_totp_key_with_replay_protection,
|
||
)
|
||
from app.const import BACKUP_CODE_LENGTH
|
||
from app.database.auth import TotpKeys
|
||
from app.database.lazer_user import User
|
||
from app.dependencies.database import Database, get_redis
|
||
from app.dependencies.user import get_client_user
|
||
from app.models.totp import FinishStatus, StartCreateTotpKeyResp
|
||
|
||
from .router import router
|
||
|
||
from fastapi import Body, Depends, HTTPException, Security
|
||
import pyotp
|
||
from pydantic import BaseModel
|
||
from redis.asyncio import Redis
|
||
|
||
|
||
class TotpStatusResp(BaseModel):
|
||
"""TOTP状态响应"""
|
||
enabled: bool
|
||
created_at: str | None = None
|
||
|
||
|
||
@router.get(
|
||
"/totp/status",
|
||
name="检查 TOTP 状态",
|
||
description="检查当前用户是否已启用 TOTP 双因素验证",
|
||
tags=["验证", "g0v0 API"],
|
||
response_model=TotpStatusResp,
|
||
)
|
||
async def get_totp_status(
|
||
current_user: User = Security(get_client_user),
|
||
):
|
||
"""检查用户是否已创建TOTP"""
|
||
totp_key = await current_user.awaitable_attrs.totp_key
|
||
|
||
if totp_key:
|
||
return TotpStatusResp(
|
||
enabled=True,
|
||
created_at=totp_key.created_at.isoformat()
|
||
)
|
||
else:
|
||
return TotpStatusResp(enabled=False)
|
||
|
||
|
||
@router.post(
|
||
"/totp/create",
|
||
name="开始 TOTP 创建流程",
|
||
description=(
|
||
"开始 TOTP 创建流程\n\n"
|
||
"返回 TOTP 密钥和 URI,供用户在身份验证器应用中添加账户。\n\n"
|
||
"然后将身份验证器应用提供的 TOTP 代码请求 PUT `/api/private/totp/create` 来完成 TOTP 创建流程。\n\n"
|
||
"若 5 分钟内未完成或错误 3 次以上则创建流程需要重新开始。"
|
||
),
|
||
tags=["验证", "g0v0 API"],
|
||
response_model=StartCreateTotpKeyResp,
|
||
status_code=201,
|
||
)
|
||
async def start_create_totp(
|
||
redis: Redis = Depends(get_redis),
|
||
current_user: User = Security(get_client_user),
|
||
):
|
||
if await current_user.awaitable_attrs.totp_key:
|
||
raise HTTPException(status_code=400, detail="TOTP is already enabled for this user")
|
||
|
||
previous = await redis.hgetall(totp_redis_key(current_user)) # pyright: ignore[reportGeneralTypeIssues]
|
||
if previous: # pyright: ignore[reportGeneralTypeIssues]
|
||
from app.auth import _generate_totp_account_label, _generate_totp_issuer_name
|
||
|
||
account_label = _generate_totp_account_label(current_user)
|
||
issuer_name = _generate_totp_issuer_name()
|
||
|
||
return StartCreateTotpKeyResp(
|
||
secret=previous["secret"],
|
||
uri=pyotp.totp.TOTP(previous["secret"]).provisioning_uri(
|
||
name=account_label,
|
||
issuer_name=issuer_name,
|
||
),
|
||
)
|
||
return await start_create_totp_key(current_user, redis)
|
||
|
||
|
||
@router.put(
|
||
"/totp/create",
|
||
name="完成 TOTP 创建流程",
|
||
description=(
|
||
"完成 TOTP 创建流程,验证用户提供的 TOTP 代码。\n\n"
|
||
"- 如果验证成功,启用用户的 TOTP 双因素验证,并返回备份码。\n- 如果验证失败,返回错误信息。"
|
||
),
|
||
tags=["验证", "g0v0 API"],
|
||
response_model=list[str],
|
||
status_code=201,
|
||
)
|
||
async def finish_create_totp(
|
||
session: Database,
|
||
code: str = Body(..., embed=True, description="用户提供的 TOTP 代码"),
|
||
redis: Redis = Depends(get_redis),
|
||
current_user: User = Security(get_client_user),
|
||
):
|
||
status, backup_codes = await finish_create_totp_key(current_user, code, redis, session)
|
||
if status == FinishStatus.SUCCESS:
|
||
return backup_codes
|
||
elif status == FinishStatus.INVALID:
|
||
raise HTTPException(status_code=400, detail="No TOTP setup in progress or invalid data")
|
||
elif status == FinishStatus.TOO_MANY_ATTEMPTS:
|
||
raise HTTPException(status_code=400, detail="Too many failed attempts. Please start over.")
|
||
else:
|
||
raise HTTPException(status_code=400, detail="Invalid TOTP code")
|
||
|
||
|
||
@router.delete(
|
||
"/totp",
|
||
name="禁用 TOTP 双因素验证",
|
||
description="禁用当前用户的 TOTP 双因素验证",
|
||
tags=["验证", "g0v0 API"],
|
||
status_code=204,
|
||
)
|
||
async def disable_totp(
|
||
session: Database,
|
||
code: str = Body(..., embed=True, description="用户提供的 TOTP 代码或备份码"),
|
||
redis: Redis = Depends(get_redis),
|
||
current_user: User = Security(get_client_user),
|
||
):
|
||
totp = await session.get(TotpKeys, current_user.id)
|
||
if not totp:
|
||
raise HTTPException(status_code=400, detail="TOTP is not enabled for this user")
|
||
|
||
# 使用防重放保护的TOTP验证或备份码验证
|
||
is_totp_valid = False
|
||
if len(code) == 6 and code.isdigit():
|
||
is_totp_valid = await verify_totp_key_with_replay_protection(current_user.id, totp.secret, code, redis)
|
||
elif len(code) == BACKUP_CODE_LENGTH:
|
||
is_totp_valid = check_totp_backup_code(totp, code)
|
||
|
||
if is_totp_valid:
|
||
await session.delete(totp)
|
||
await session.commit()
|
||
else:
|
||
raise HTTPException(status_code=400, detail="Invalid TOTP code or backup code")
|