From e5dc11cf86dc73776d4374713111eb14e31bc8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=95=E8=B0=B7=E9=85=B1?= <74496778+GooGuJiang@users.noreply.github.com> Date: Fri, 22 Aug 2025 09:22:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=82=AE=E4=BB=B6=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/database/password_reset.py | 26 ++ app/router/password_reset_admin.py | 178 ++++++++++++++ app/service/email_queue.py | 276 ++++++++++++++++++++++ app/service/email_verification_service.py | 166 ++++++++++++- app/service/password_reset_service.py | 60 +++-- main.py | 3 + 6 files changed, 680 insertions(+), 29 deletions(-) create mode 100644 app/database/password_reset.py create mode 100644 app/router/password_reset_admin.py create mode 100644 app/service/email_queue.py diff --git a/app/database/password_reset.py b/app/database/password_reset.py new file mode 100644 index 0000000..38cc962 --- /dev/null +++ b/app/database/password_reset.py @@ -0,0 +1,26 @@ +""" +密码重置相关数据库模型 +""" + +from __future__ import annotations + +from datetime import datetime, UTC +from sqlmodel import SQLModel, Field +from sqlalchemy import Column, BigInteger, ForeignKey + + +class PasswordReset(SQLModel, table=True): + """密码重置记录""" + + __tablename__: str = "password_resets" + + id: int | None = Field(default=None, primary_key=True) + user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True)) + email: str = Field(index=True) + reset_code: str = Field(max_length=8) # 8位重置验证码 + created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) + expires_at: datetime = Field() # 验证码过期时间 + is_used: bool = Field(default=False) # 是否已使用 + used_at: datetime | None = Field(default=None) + ip_address: str | None = Field(default=None) # 请求IP + user_agent: str | None = Field(default=None) # 用户代理 diff --git a/app/router/password_reset_admin.py b/app/router/password_reset_admin.py new file mode 100644 index 0000000..8abd81c --- /dev/null +++ b/app/router/password_reset_admin.py @@ -0,0 +1,178 @@ +""" +密码重置管理接口 +""" + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import JSONResponse +from redis.asyncio import Redis + +from app.dependencies.database import get_redis +from app.service.password_reset_service import password_reset_service +from app.log import logger + +router = APIRouter(prefix="/admin/password-reset", tags=["密码重置管理"]) + + +@router.get( + "/status/{email}", + name="查询重置状态", + description="查询指定邮箱的密码重置状态" +) +async def get_password_reset_status( + email: str, + redis: Redis = Depends(get_redis), +): + """查询密码重置状态""" + try: + info = await password_reset_service.get_reset_code_info(email, redis) + return JSONResponse( + status_code=200, + content={ + "success": True, + "data": info + } + ) + except Exception as e: + logger.error(f"[Admin] Failed to get password reset status for {email}: {e}") + return JSONResponse( + status_code=500, + content={ + "success": False, + "error": "获取状态失败" + } + ) + + +@router.delete( + "/cleanup/{email}", + name="清理重置数据", + description="强制清理指定邮箱的密码重置数据" +) +async def force_cleanup_reset( + email: str, + redis: Redis = Depends(get_redis), +): + """强制清理密码重置数据""" + try: + success = await password_reset_service.force_cleanup_user_reset(email, redis) + + if success: + return JSONResponse( + status_code=200, + content={ + "success": True, + "message": f"已清理邮箱 {email} 的重置数据" + } + ) + else: + return JSONResponse( + status_code=500, + content={ + "success": False, + "error": "清理失败" + } + ) + except Exception as e: + logger.error(f"[Admin] Failed to cleanup password reset for {email}: {e}") + return JSONResponse( + status_code=500, + content={ + "success": False, + "error": "清理操作失败" + } + ) + + +@router.post( + "/cleanup/expired", + name="清理过期验证码", + description="清理所有过期的密码重置验证码" +) +async def cleanup_expired_codes( + redis: Redis = Depends(get_redis), +): + """清理过期验证码""" + try: + count = await password_reset_service.cleanup_expired_codes(redis) + return JSONResponse( + status_code=200, + content={ + "success": True, + "message": f"已清理 {count} 个过期的验证码", + "cleaned_count": count + } + ) + except Exception as e: + logger.error(f"[Admin] Failed to cleanup expired codes: {e}") + return JSONResponse( + status_code=500, + content={ + "success": False, + "error": "清理操作失败" + } + ) + + +@router.get( + "/stats", + name="重置统计", + description="获取密码重置的统计信息" +) +async def get_reset_statistics( + redis: Redis = Depends(get_redis), +): + """获取重置统计信息""" + try: + # 获取所有重置相关的键 + reset_keys = await redis.keys("password_reset:code:*") + rate_limit_keys = await redis.keys("password_reset:rate_limit:*") + + active_resets = 0 + used_resets = 0 + active_rate_limits = 0 + + # 统计活跃重置 + for key in reset_keys: + data_str = await redis.get(key) + if data_str: + try: + import json + data = json.loads(data_str) + if data.get("used", False): + used_resets += 1 + else: + active_resets += 1 + except: + pass + + # 统计频率限制 + for key in rate_limit_keys: + ttl = await redis.ttl(key) + if ttl > 0: + active_rate_limits += 1 + + stats = { + "total_reset_codes": len(reset_keys), + "active_resets": active_resets, + "used_resets": used_resets, + "active_rate_limits": active_rate_limits, + "total_rate_limit_keys": len(rate_limit_keys) + } + + return JSONResponse( + status_code=200, + content={ + "success": True, + "data": stats + } + ) + + except Exception as e: + logger.error(f"[Admin] Failed to get reset statistics: {e}") + return JSONResponse( + status_code=500, + content={ + "success": False, + "error": "获取统计信息失败" + } + ) diff --git a/app/service/email_queue.py b/app/service/email_queue.py new file mode 100644 index 0000000..a583836 --- /dev/null +++ b/app/service/email_queue.py @@ -0,0 +1,276 @@ +""" +邮件队列服务 +用于异步发送邮件 +""" + +from __future__ import annotations + +import asyncio +import concurrent.futures +from datetime import datetime +import json +import uuid +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from typing import Dict, Any, Optional +import redis as sync_redis # 添加同步Redis导入 + +from app.config import settings +from app.dependencies.database import redis_message_client # 使用同步Redis客户端 +from app.log import logger + + +class EmailQueue: + """Redis 邮件队列服务""" + + def __init__(self): + # 创建专门用于邮件队列的同步Redis客户端 (db=0) + self.redis = sync_redis.from_url(settings.redis_url, decode_responses=True, db=0) + self._processing = False + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) + self._retry_limit = 3 # 重试次数限制 + + # 邮件配置 + self.smtp_server = getattr(settings, 'smtp_server', 'localhost') + self.smtp_port = getattr(settings, 'smtp_port', 587) + self.smtp_username = getattr(settings, 'smtp_username', '') + self.smtp_password = getattr(settings, 'smtp_password', '') + self.from_email = getattr(settings, 'from_email', 'noreply@example.com') + self.from_name = getattr(settings, 'from_name', 'osu! server') + + async def _run_in_executor(self, func, *args): + """在线程池中运行同步操作""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(self._executor, func, *args) + + async def start_processing(self): + """启动邮件处理任务""" + if not self._processing: + self._processing = True + asyncio.create_task(self._process_email_queue()) + logger.info("Email queue processing started") + + async def stop_processing(self): + """停止邮件处理""" + self._processing = False + logger.info("Email queue processing stopped") + + async def enqueue_email(self, + to_email: str, + subject: str, + content: str, + html_content: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None) -> str: + """ + 将邮件加入队列等待发送 + + Args: + to_email: 收件人邮箱地址 + subject: 邮件主题 + content: 邮件纯文本内容 + html_content: 邮件HTML内容(如果有) + metadata: 额外元数据(如密码重置ID等) + + Returns: + 邮件任务ID + """ + email_id = str(uuid.uuid4()) + + email_data = { + "id": email_id, + "to_email": to_email, + "subject": subject, + "content": content, + "html_content": html_content if html_content else "", + "metadata": json.dumps(metadata) if metadata else "{}", + "created_at": datetime.now().isoformat(), + "status": "pending", # pending, sending, sent, failed + "retry_count": "0" + } + + # 将邮件数据存入Redis + await self._run_in_executor( + lambda: self.redis.hset(f"email:{email_id}", mapping=email_data) + ) + + # 设置24小时过期(防止数据堆积) + await self._run_in_executor( + self.redis.expire, f"email:{email_id}", 86400 + ) + + # 加入发送队列 + await self._run_in_executor( + self.redis.lpush, "email_queue", email_id + ) + + logger.info(f"Email enqueued with id: {email_id} to {to_email}") + return email_id + + async def get_email_status(self, email_id: str) -> Dict[str, Any]: + """ + 获取邮件发送状态 + + Args: + email_id: 邮件任务ID + + Returns: + 邮件任务状态信息 + """ + email_data = await self._run_in_executor( + self.redis.hgetall, f"email:{email_id}" + ) + + # 解码Redis返回的字节数据 + if email_data: + return { + k.decode("utf-8") if isinstance(k, bytes) else k: + v.decode("utf-8") if isinstance(v, bytes) else v + for k, v in email_data.items() + } + + return {"status": "not_found"} + + async def _process_email_queue(self): + """处理邮件队列""" + logger.info("Starting email queue processor") + + while self._processing: + try: + # 从队列获取邮件ID + def brpop_operation(): + return self.redis.brpop(["email_queue"], timeout=5) + + result = await self._run_in_executor(brpop_operation) + + if not result: + await asyncio.sleep(1) + continue + + # 解包返回结果(列表名和值) + queue_name, email_id = result + if isinstance(email_id, bytes): + email_id = email_id.decode("utf-8") + + # 获取邮件数据 + email_data = await self.get_email_status(email_id) + if email_data.get("status") == "not_found": + logger.warning(f"Email data not found for id: {email_id}") + continue + + # 更新状态为发送中 + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "status", "sending" + ) + + # 尝试发送邮件 + success = await self._send_email(email_data) + + if success: + # 更新状态为已发送 + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "status", "sent" + ) + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "sent_at", datetime.now().isoformat() + ) + logger.info(f"Email {email_id} sent successfully to {email_data.get('to_email')}") + else: + # 计算重试次数 + retry_count = int(email_data.get("retry_count", "0")) + 1 + + if retry_count <= self._retry_limit: + # 重新入队,稍后重试 + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "retry_count", str(retry_count) + ) + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "status", "pending" + ) + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "last_retry", datetime.now().isoformat() + ) + + # 延迟重试(使用指数退避) + delay = 60 * (2 ** (retry_count - 1)) # 1分钟,2分钟,4分钟... + + # 创建延迟任务 + asyncio.create_task(self._delayed_retry(email_id, delay)) + + logger.warning(f"Email {email_id} will be retried in {delay} seconds (attempt {retry_count})") + else: + # 超过重试次数,标记为失败 + await self._run_in_executor( + self.redis.hset, f"email:{email_id}", "status", "failed" + ) + logger.error(f"Email {email_id} failed after {retry_count} attempts") + + except Exception as e: + logger.error(f"Error processing email queue: {e}") + await asyncio.sleep(5) # 出错后等待5秒 + + async def _delayed_retry(self, email_id: str, delay: int): + """延迟重试发送邮件""" + await asyncio.sleep(delay) + await self._run_in_executor( + self.redis.lpush, "email_queue", email_id + ) + logger.info(f"Re-queued email {email_id} for retry after {delay} seconds") + + async def _send_email(self, email_data: Dict[str, Any]) -> bool: + """ + 实际发送邮件 + + Args: + email_data: 邮件数据 + + Returns: + 是否发送成功 + """ + try: + # 如果邮件发送功能被禁用,则只记录日志 + if not getattr(settings, 'enable_email_sending', True): + logger.info(f"[Mock Email] Would send to {email_data.get('to_email')}: {email_data.get('subject')}") + return True + + # 创建邮件 + msg = MIMEMultipart('alternative') + msg['From'] = f"{self.from_name} <{self.from_email}>" + msg['To'] = email_data.get('to_email', '') + msg['Subject'] = email_data.get('subject', '') + + # 添加纯文本内容 + content = email_data.get('content', '') + if content: + msg.attach(MIMEText(content, 'plain', 'utf-8')) + + # 添加HTML内容(如果有) + html_content = email_data.get('html_content', '') + if html_content: + msg.attach(MIMEText(html_content, 'html', 'utf-8')) + + # 发送邮件 + with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: + if self.smtp_username and self.smtp_password: + server.starttls() + server.login(self.smtp_username, self.smtp_password) + + server.send_message(msg) + + return True + + except Exception as e: + logger.error(f"Failed to send email: {e}") + return False + + +# 全局邮件队列实例 +email_queue = EmailQueue() + +# 在应用启动时调用 +async def start_email_processor(): + await email_queue.start_processing() + +# 在应用关闭时调用 +async def stop_email_processor(): + await email_queue.stop_processing() diff --git a/app/service/email_verification_service.py b/app/service/email_verification_service.py index d513fbc..5207fa9 100644 --- a/app/service/email_verification_service.py +++ b/app/service/email_verification_service.py @@ -11,6 +11,7 @@ from typing import Optional from app.database.email_verification import EmailVerification, LoginSession from app.service.email_service import email_service +from app.service.email_queue import email_queue # 导入邮件队列 from app.log import logger from app.config import settings @@ -27,6 +28,163 @@ class EmailVerificationService: """生成8位验证码""" return ''.join(secrets.choice(string.digits) for _ in range(8)) + @staticmethod + async def send_verification_email_via_queue(email: str, code: str, username: str, user_id: int) -> bool: + """使用邮件队列发送验证邮件 + + Args: + email: 接收验证码的邮箱地址 + code: 验证码 + username: 用户名 + user_id: 用户ID + + Returns: + 是否成功将邮件加入队列 + """ + try: + # HTML 邮件内容 + html_content = f""" + + + + + + + +
+
+

osu! 邮箱验证

+

Email Verification

+
+ +
+

你好 {username}!

+

请使用以下验证码验证您的账户:

+ +
{code}
+ +

验证码将在 10 分钟内有效

+ +
+

重要提示:

+
    +
  • 请不要与任何人分享此验证码
  • +
  • 如果您没有请求验证码,请忽略此邮件
  • +
  • 为了账户安全,请勿在其他网站使用相同的密码
  • +
+
+ +
+ +

Hello {username}!

+

Please use the following verification code to verify your account:

+ +

This verification code will be valid for 10 minutes.

+ +

Important: Do not share this verification code with anyone. If you did not request this code, please ignore this email.

+
+ + +
+ + + """ + + # 纯文本备用内容 + plain_content = f""" +你好 {username}! + +请使用以下验证码验证您的账户: + +{code} + +验证码将在10分钟内有效。 + +重要提示: +- 请不要与任何人分享此验证码 +- 如果您没有请求验证码,请忽略此邮件 +- 为了账户安全,请勿在其他网站使用相同的密码 + +Hello {username}! +Please use the following verification code to verify your account. +This verification code will be valid for 10 minutes. + +© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。 +This email was sent automatically, please do not reply. +""" + + # 将邮件加入队列 + subject = "邮箱验证 - Email Verification" + metadata = { + "type": "email_verification", + "user_id": user_id, + "code": code + } + + await email_queue.enqueue_email( + to_email=email, + subject=subject, + content=plain_content, + html_content=html_content, + metadata=metadata + ) + + return True + + except Exception as e: + logger.error(f"[Email Verification] Failed to enqueue email: {e}") + return False + @staticmethod def generate_session_token() -> str: """生成会话令牌""" @@ -106,14 +264,14 @@ class EmailVerificationService: db, redis, user_id, email, ip_address, user_agent ) - # 发送邮件 - success = await email_service.send_verification_email(email, code, username) + # 使用邮件队列发送验证邮件 + success = await EmailVerificationService.send_verification_email_via_queue(email, code, username, user_id) if success: - logger.info(f"[Email Verification] Successfully sent verification email to {email} (user: {username})") + logger.info(f"[Email Verification] Successfully enqueued verification email to {email} (user: {username})") return True else: - logger.error(f"[Email Verification] Failed to send verification email: {email} (user: {username})") + logger.error(f"[Email Verification] Failed to enqueue verification email: {email} (user: {username})") return False except Exception as e: diff --git a/app/service/password_reset_service.py b/app/service/password_reset_service.py index afe2865..193b897 100644 --- a/app/service/password_reset_service.py +++ b/app/service/password_reset_service.py @@ -14,6 +14,7 @@ from app.config import settings from app.database import User from app.dependencies.database import with_db from app.service.email_service import EmailService +from app.service.email_queue import email_queue # 导入邮件队列 from app.log import logger from app.auth import get_password_hash, invalidate_user_tokens @@ -134,17 +135,8 @@ class PasswordResetService: return False, "服务暂时不可用,请稍后重试" async def send_password_reset_email(self, email: str, code: str, username: str) -> bool: - """发送密码重置邮件""" + """发送密码重置邮件(使用邮件队列)""" try: - from email.mime.text import MIMEText - from email.mime.multipart import MIMEMultipart - import smtplib - - msg = MIMEMultipart() - msg['From'] = f"{self.email_service.from_name} <{self.email_service.from_email}>" - msg['To'] = email - msg['Subject'] = "密码重置 - Password Reset" - # HTML 邮件内容 html_content = f""" @@ -254,26 +246,44 @@ class PasswordResetService: """ - msg.attach(MIMEText(html_content, 'html', 'utf-8')) + # 纯文本内容(作为备用) + plain_content = f""" +你好 {username}! + +我们收到了您的密码重置请求。如果这是您本人操作,请使用以下验证码重置密码: + +{code} + +这个验证码将在10分钟后过期。 + +安全提醒: +- 请不要与任何人分享这个验证码 +- 如果您没有请求密码重置,请立即忽略这封邮件 +- 验证码只能使用一次 +- 建议设置一个强密码以保护您的账户安全 + +如果您有任何问题,请联系我们的支持团队。 + +© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。 +""" + + # 添加邮件到队列 + subject = "密码重置 - Password Reset" + metadata = {"type": "password_reset", "email": email, "code": code} - # 发送邮件 - if not getattr(settings, 'enable_email_sending', True): - # 邮件发送功能禁用时只记录日志,不实际发送 - logger.info(f"[Password Reset] Mock sending reset code to {email}: {code}") - return True + await email_queue.enqueue_email( + to_email=email, + subject=subject, + content=plain_content, + html_content=html_content, + metadata=metadata + ) - with smtplib.SMTP(self.email_service.smtp_server, self.email_service.smtp_port) as server: - if self.email_service.smtp_username and self.email_service.smtp_password: - server.starttls() - server.login(self.email_service.smtp_username, self.email_service.smtp_password) - - server.send_message(msg) - - logger.info(f"[Password Reset] Successfully sent reset code to {email}") + logger.info(f"[Password Reset] Enqueued reset code email to {email}") return True except Exception as e: - logger.error(f"[Password Reset] Failed to send email: {e}") + logger.error(f"[Password Reset] Failed to enqueue email: {e}") return False async def reset_password( diff --git a/main.py b/main.py index 716301d..9339cff 100644 --- a/main.py +++ b/main.py @@ -28,6 +28,7 @@ from app.service.beatmap_download_service import download_service from app.service.calculate_all_user_rank import calculate_user_rank from app.service.create_banchobot import create_banchobot from app.service.daily_challenge import daily_challenge_job, process_daily_challenge_top +from app.service.email_queue import start_email_processor, stop_email_processor from app.service.geoip_scheduler import schedule_geoip_updates from app.service.init_geoip import init_geoip from app.service.load_achievements import load_achievements @@ -78,6 +79,7 @@ async def lifespan(app: FastAPI): await daily_challenge_job() await process_daily_challenge_top() await create_banchobot() + await start_email_processor() # 启动邮件队列处理器 await download_service.start_health_check() # 启动下载服务健康检查 await start_cache_scheduler() # 启动缓存调度器 await start_database_cleanup_scheduler() # 启动数据库清理调度器 @@ -92,6 +94,7 @@ async def lifespan(app: FastAPI): await stop_cache_scheduler() # 停止缓存调度器 await stop_database_cleanup_scheduler() # 停止数据库清理调度器 await download_service.stop_health_check() # 停止下载服务健康检查 + await stop_email_processor() # 停止邮件队列处理器 await engine.dispose() await redis_client.aclose()