Files
g0v0-server/app/service/turnstile_service.py
咕谷酱 73d25c7604 Add Cloudflare Turnstile verification to auth flows
Introduces Cloudflare Turnstile verification for registration, OAuth password grant, and password reset endpoints (excluding osu! client). Adds related configuration options and a new service for token validation. Also refactors password change logic to support TOTP or password-based verification, improving security for users with TOTP enabled.
2025-10-12 02:39:46 +08:00

91 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Cloudflare Turnstile 验证服务
负责验证 Cloudflare Turnstile token 的有效性
"""
from app.config import settings
from app.log import log
import httpx
logger = log("Turnstile")
TURNSTILE_VERIFY_URL = "https://challenges.cloudflare.com/turnstile/v0/siteverify"
DUMMY_TOKEN = "XXXX.DUMMY.TOKEN.XXXX" # noqa: S105
class TurnstileService:
"""Cloudflare Turnstile 验证服务"""
@staticmethod
async def verify_token(token: str, remoteip: str | None = None) -> tuple[bool, str]:
"""验证 Turnstile token
Args:
token: Turnstile 响应 token
remoteip: 客户端 IP 地址(可选)
Returns:
tuple[bool, str]: (是否成功, 错误消息)
"""
# 如果未启用 Turnstile 验证,直接返回成功
if not settings.enable_turnstile_verification:
return True, ""
# 开发模式:直接跳过验证
if settings.turnstile_dev_mode:
logger.debug("Turnstile dev mode enabled, skipping verification")
return True, ""
# 检查是否为 dummy token仅在开发模式下接受
if token == DUMMY_TOKEN:
logger.warning(f"Dummy token provided but dev mode is disabled (IP: {remoteip})")
return False, "Invalid verification token"
# 检查配置
if not settings.turnstile_secret_key:
logger.error("Turnstile secret key not configured")
return False, "Turnstile verification not configured"
# 准备请求数据
data = {"secret": settings.turnstile_secret_key, "response": token}
if remoteip:
data["remoteip"] = remoteip
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(TURNSTILE_VERIFY_URL, data=data)
response.raise_for_status()
result = response.json()
if result.get("success"):
logger.debug(f"Turnstile verification successful for IP {remoteip}")
return True, ""
else:
error_codes = result.get("error-codes", [])
logger.warning(f"Turnstile verification failed for IP {remoteip}, errors: {error_codes}")
# 根据错误代码提供友好的错误消息
if "timeout-or-duplicate" in error_codes:
return False, "Verification token expired or already used"
elif "invalid-input-response" in error_codes:
return False, "Invalid verification token"
elif "missing-input-response" in error_codes:
return False, "Verification token is required"
else:
return False, "Verification failed"
except httpx.TimeoutException:
logger.error("Turnstile verification timeout")
return False, "Verification service timeout"
except httpx.HTTPError as e:
logger.error(f"Turnstile verification HTTP error: {e}")
return False, "Verification service error"
except Exception as e: # Catch any unexpected errors
logger.exception(f"Turnstile verification unexpected error: {e}")
return False, "Verification service error"
turnstile_service = TurnstileService()