Files
g0v0-server/app/router/private/totp.py

145 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
from app.auth import (
check_totp_backup_code,
finish_create_totp_key,
start_create_totp_key,
totp_redis_key,
verify_totp_key_with_replay_protection,
)
from app.const import BACKUP_CODE_LENGTH
from app.database.auth import TotpKeys
from app.database.user import User
from app.dependencies.database import Database, get_redis
from app.dependencies.user import get_client_user
from app.models.totp import FinishStatus, StartCreateTotpKeyResp
from .router import router
from fastapi import Body, Depends, HTTPException, Security
from pydantic import BaseModel
import pyotp
from redis.asyncio import Redis
class TotpStatusResp(BaseModel):
"""TOTP状态响应"""
enabled: bool
created_at: str | None = None
@router.get(
"/totp/status",
name="检查 TOTP 状态",
description="检查当前用户是否已启用 TOTP 双因素验证",
tags=["验证", "g0v0 API"],
response_model=TotpStatusResp,
)
async def get_totp_status(
current_user: User = Security(get_client_user),
):
"""检查用户是否已创建TOTP"""
totp_key = await current_user.awaitable_attrs.totp_key
if totp_key:
return TotpStatusResp(enabled=True, created_at=totp_key.created_at.isoformat())
else:
return TotpStatusResp(enabled=False)
@router.post(
"/totp/create",
name="开始 TOTP 创建流程",
description=(
"开始 TOTP 创建流程\n\n"
"返回 TOTP 密钥和 URI供用户在身份验证器应用中添加账户。\n\n"
"然后将身份验证器应用提供的 TOTP 代码请求 PUT `/api/private/totp/create` 来完成 TOTP 创建流程。\n\n"
"若 5 分钟内未完成或错误 3 次以上则创建流程需要重新开始。"
),
tags=["验证", "g0v0 API"],
response_model=StartCreateTotpKeyResp,
status_code=201,
)
async def start_create_totp(
redis: Redis = Depends(get_redis),
current_user: User = Security(get_client_user),
):
if await current_user.awaitable_attrs.totp_key:
raise HTTPException(status_code=400, detail="TOTP is already enabled for this user")
previous = await redis.hgetall(totp_redis_key(current_user)) # pyright: ignore[reportGeneralTypeIssues]
if previous: # pyright: ignore[reportGeneralTypeIssues]
from app.auth import _generate_totp_account_label, _generate_totp_issuer_name
account_label = _generate_totp_account_label(current_user)
issuer_name = _generate_totp_issuer_name()
return StartCreateTotpKeyResp(
secret=previous["secret"],
uri=pyotp.totp.TOTP(previous["secret"]).provisioning_uri(
name=account_label,
issuer_name=issuer_name,
),
)
return await start_create_totp_key(current_user, redis)
@router.put(
"/totp/create",
name="完成 TOTP 创建流程",
description=(
"完成 TOTP 创建流程,验证用户提供的 TOTP 代码。\n\n"
"- 如果验证成功,启用用户的 TOTP 双因素验证,并返回备份码。\n- 如果验证失败,返回错误信息。"
),
tags=["验证", "g0v0 API"],
response_model=list[str],
status_code=201,
)
async def finish_create_totp(
session: Database,
code: str = Body(..., embed=True, description="用户提供的 TOTP 代码"),
redis: Redis = Depends(get_redis),
current_user: User = Security(get_client_user),
):
status, backup_codes = await finish_create_totp_key(current_user, code, redis, session)
if status == FinishStatus.SUCCESS:
return backup_codes
elif status == FinishStatus.INVALID:
raise HTTPException(status_code=400, detail="No TOTP setup in progress or invalid data")
elif status == FinishStatus.TOO_MANY_ATTEMPTS:
raise HTTPException(status_code=400, detail="Too many failed attempts. Please start over.")
else:
raise HTTPException(status_code=400, detail="Invalid TOTP code")
@router.delete(
"/totp",
name="禁用 TOTP 双因素验证",
description="禁用当前用户的 TOTP 双因素验证",
tags=["验证", "g0v0 API"],
status_code=204,
)
async def disable_totp(
session: Database,
code: str = Body(..., embed=True, description="用户提供的 TOTP 代码或备份码"),
redis: Redis = Depends(get_redis),
current_user: User = Security(get_client_user),
):
totp = await session.get(TotpKeys, current_user.id)
if not totp:
raise HTTPException(status_code=400, detail="TOTP is not enabled for this user")
# 使用防重放保护的TOTP验证或备份码验证
is_totp_valid = False
if len(code) == 6 and code.isdigit():
is_totp_valid = await verify_totp_key_with_replay_protection(current_user.id, totp.secret, code, redis)
elif len(code) == BACKUP_CODE_LENGTH:
is_totp_valid = check_totp_backup_code(totp, code)
if is_totp_valid:
await session.delete(totp)
await session.commit()
else:
raise HTTPException(status_code=400, detail="Invalid TOTP code or backup code")