Introduces Cloudflare Turnstile verification for registration, OAuth password grant, and password reset endpoints (excluding osu! client). Adds related configuration options and a new service for token validation. Also refactors password change logic to support TOTP or password-based verification, improving security for users with TOTP enabled.
91 lines
3.3 KiB
Python
91 lines
3.3 KiB
Python
"""Cloudflare Turnstile 验证服务
|
||
|
||
负责验证 Cloudflare Turnstile token 的有效性
|
||
"""
|
||
|
||
from app.config import settings
|
||
from app.log import log
|
||
|
||
import httpx
|
||
|
||
logger = log("Turnstile")
|
||
|
||
TURNSTILE_VERIFY_URL = "https://challenges.cloudflare.com/turnstile/v0/siteverify"
|
||
DUMMY_TOKEN = "XXXX.DUMMY.TOKEN.XXXX" # noqa: S105
|
||
|
||
|
||
class TurnstileService:
|
||
"""Cloudflare Turnstile 验证服务"""
|
||
|
||
@staticmethod
|
||
async def verify_token(token: str, remoteip: str | None = None) -> tuple[bool, str]:
|
||
"""验证 Turnstile token
|
||
|
||
Args:
|
||
token: Turnstile 响应 token
|
||
remoteip: 客户端 IP 地址(可选)
|
||
|
||
Returns:
|
||
tuple[bool, str]: (是否成功, 错误消息)
|
||
"""
|
||
# 如果未启用 Turnstile 验证,直接返回成功
|
||
if not settings.enable_turnstile_verification:
|
||
return True, ""
|
||
|
||
# 开发模式:直接跳过验证
|
||
if settings.turnstile_dev_mode:
|
||
logger.debug("Turnstile dev mode enabled, skipping verification")
|
||
return True, ""
|
||
|
||
# 检查是否为 dummy token(仅在开发模式下接受)
|
||
if token == DUMMY_TOKEN:
|
||
logger.warning(f"Dummy token provided but dev mode is disabled (IP: {remoteip})")
|
||
return False, "Invalid verification token"
|
||
|
||
# 检查配置
|
||
if not settings.turnstile_secret_key:
|
||
logger.error("Turnstile secret key not configured")
|
||
return False, "Turnstile verification not configured"
|
||
|
||
# 准备请求数据
|
||
data = {"secret": settings.turnstile_secret_key, "response": token}
|
||
|
||
if remoteip:
|
||
data["remoteip"] = remoteip
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.post(TURNSTILE_VERIFY_URL, data=data)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
|
||
if result.get("success"):
|
||
logger.debug(f"Turnstile verification successful for IP {remoteip}")
|
||
return True, ""
|
||
else:
|
||
error_codes = result.get("error-codes", [])
|
||
logger.warning(f"Turnstile verification failed for IP {remoteip}, errors: {error_codes}")
|
||
|
||
# 根据错误代码提供友好的错误消息
|
||
if "timeout-or-duplicate" in error_codes:
|
||
return False, "Verification token expired or already used"
|
||
elif "invalid-input-response" in error_codes:
|
||
return False, "Invalid verification token"
|
||
elif "missing-input-response" in error_codes:
|
||
return False, "Verification token is required"
|
||
else:
|
||
return False, "Verification failed"
|
||
|
||
except httpx.TimeoutException:
|
||
logger.error("Turnstile verification timeout")
|
||
return False, "Verification service timeout"
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"Turnstile verification HTTP error: {e}")
|
||
return False, "Verification service error"
|
||
except Exception as e: # Catch any unexpected errors
|
||
logger.exception(f"Turnstile verification unexpected error: {e}")
|
||
return False, "Verification service error"
|
||
|
||
|
||
turnstile_service = TurnstileService()
|