diff --git a/app/database/password_reset.py b/app/database/password_reset.py
new file mode 100644
index 0000000..38cc962
--- /dev/null
+++ b/app/database/password_reset.py
@@ -0,0 +1,26 @@
+"""
+密码重置相关数据库模型
+"""
+
+from __future__ import annotations
+
+from datetime import datetime, UTC
+from sqlmodel import SQLModel, Field
+from sqlalchemy import Column, BigInteger, ForeignKey
+
+
+class PasswordReset(SQLModel, table=True):
+ """密码重置记录"""
+
+ __tablename__: str = "password_resets"
+
+ id: int | None = Field(default=None, primary_key=True)
+ user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True))
+ email: str = Field(index=True)
+ reset_code: str = Field(max_length=8) # 8位重置验证码
+ created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
+ expires_at: datetime = Field() # 验证码过期时间
+ is_used: bool = Field(default=False) # 是否已使用
+ used_at: datetime | None = Field(default=None)
+ ip_address: str | None = Field(default=None) # 请求IP
+ user_agent: str | None = Field(default=None) # 用户代理
diff --git a/app/router/password_reset_admin.py b/app/router/password_reset_admin.py
new file mode 100644
index 0000000..8abd81c
--- /dev/null
+++ b/app/router/password_reset_admin.py
@@ -0,0 +1,178 @@
+"""
+密码重置管理接口
+"""
+
+from fastapi import APIRouter, Depends, HTTPException
+from fastapi.responses import JSONResponse
+from redis.asyncio import Redis
+
+from app.dependencies.database import get_redis
+from app.service.password_reset_service import password_reset_service
+from app.log import logger
+
+router = APIRouter(prefix="/admin/password-reset", tags=["密码重置管理"])
+
+
+@router.get(
+ "/status/{email}",
+ name="查询重置状态",
+ description="查询指定邮箱的密码重置状态"
+)
+async def get_password_reset_status(
+ email: str,
+ redis: Redis = Depends(get_redis),
+):
+ """查询密码重置状态"""
+ try:
+ info = await password_reset_service.get_reset_code_info(email, redis)
+ return JSONResponse(
+ status_code=200,
+ content={
+ "success": True,
+ "data": info
+ }
+ )
+ except Exception as e:
+ logger.error(f"[Admin] Failed to get password reset status for {email}: {e}")
+ return JSONResponse(
+ status_code=500,
+ content={
+ "success": False,
+ "error": "获取状态失败"
+ }
+ )
+
+
+@router.delete(
+ "/cleanup/{email}",
+ name="清理重置数据",
+ description="强制清理指定邮箱的密码重置数据"
+)
+async def force_cleanup_reset(
+ email: str,
+ redis: Redis = Depends(get_redis),
+):
+ """强制清理密码重置数据"""
+ try:
+ success = await password_reset_service.force_cleanup_user_reset(email, redis)
+
+ if success:
+ return JSONResponse(
+ status_code=200,
+ content={
+ "success": True,
+ "message": f"已清理邮箱 {email} 的重置数据"
+ }
+ )
+ else:
+ return JSONResponse(
+ status_code=500,
+ content={
+ "success": False,
+ "error": "清理失败"
+ }
+ )
+ except Exception as e:
+ logger.error(f"[Admin] Failed to cleanup password reset for {email}: {e}")
+ return JSONResponse(
+ status_code=500,
+ content={
+ "success": False,
+ "error": "清理操作失败"
+ }
+ )
+
+
+@router.post(
+ "/cleanup/expired",
+ name="清理过期验证码",
+ description="清理所有过期的密码重置验证码"
+)
+async def cleanup_expired_codes(
+ redis: Redis = Depends(get_redis),
+):
+ """清理过期验证码"""
+ try:
+ count = await password_reset_service.cleanup_expired_codes(redis)
+ return JSONResponse(
+ status_code=200,
+ content={
+ "success": True,
+ "message": f"已清理 {count} 个过期的验证码",
+ "cleaned_count": count
+ }
+ )
+ except Exception as e:
+ logger.error(f"[Admin] Failed to cleanup expired codes: {e}")
+ return JSONResponse(
+ status_code=500,
+ content={
+ "success": False,
+ "error": "清理操作失败"
+ }
+ )
+
+
+@router.get(
+ "/stats",
+ name="重置统计",
+ description="获取密码重置的统计信息"
+)
+async def get_reset_statistics(
+ redis: Redis = Depends(get_redis),
+):
+ """获取重置统计信息"""
+ try:
+ # 获取所有重置相关的键
+ reset_keys = await redis.keys("password_reset:code:*")
+ rate_limit_keys = await redis.keys("password_reset:rate_limit:*")
+
+ active_resets = 0
+ used_resets = 0
+ active_rate_limits = 0
+
+ # 统计活跃重置
+ for key in reset_keys:
+ data_str = await redis.get(key)
+ if data_str:
+ try:
+ import json
+ data = json.loads(data_str)
+ if data.get("used", False):
+ used_resets += 1
+ else:
+ active_resets += 1
+ except:
+ pass
+
+ # 统计频率限制
+ for key in rate_limit_keys:
+ ttl = await redis.ttl(key)
+ if ttl > 0:
+ active_rate_limits += 1
+
+ stats = {
+ "total_reset_codes": len(reset_keys),
+ "active_resets": active_resets,
+ "used_resets": used_resets,
+ "active_rate_limits": active_rate_limits,
+ "total_rate_limit_keys": len(rate_limit_keys)
+ }
+
+ return JSONResponse(
+ status_code=200,
+ content={
+ "success": True,
+ "data": stats
+ }
+ )
+
+ except Exception as e:
+ logger.error(f"[Admin] Failed to get reset statistics: {e}")
+ return JSONResponse(
+ status_code=500,
+ content={
+ "success": False,
+ "error": "获取统计信息失败"
+ }
+ )
diff --git a/app/service/email_queue.py b/app/service/email_queue.py
new file mode 100644
index 0000000..a583836
--- /dev/null
+++ b/app/service/email_queue.py
@@ -0,0 +1,276 @@
+"""
+邮件队列服务
+用于异步发送邮件
+"""
+
+from __future__ import annotations
+
+import asyncio
+import concurrent.futures
+from datetime import datetime
+import json
+import uuid
+import smtplib
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from typing import Dict, Any, Optional
+import redis as sync_redis # 添加同步Redis导入
+
+from app.config import settings
+from app.dependencies.database import redis_message_client # 使用同步Redis客户端
+from app.log import logger
+
+
+class EmailQueue:
+ """Redis 邮件队列服务"""
+
+ def __init__(self):
+ # 创建专门用于邮件队列的同步Redis客户端 (db=0)
+ self.redis = sync_redis.from_url(settings.redis_url, decode_responses=True, db=0)
+ self._processing = False
+ self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
+ self._retry_limit = 3 # 重试次数限制
+
+ # 邮件配置
+ self.smtp_server = getattr(settings, 'smtp_server', 'localhost')
+ self.smtp_port = getattr(settings, 'smtp_port', 587)
+ self.smtp_username = getattr(settings, 'smtp_username', '')
+ self.smtp_password = getattr(settings, 'smtp_password', '')
+ self.from_email = getattr(settings, 'from_email', 'noreply@example.com')
+ self.from_name = getattr(settings, 'from_name', 'osu! server')
+
+ async def _run_in_executor(self, func, *args):
+ """在线程池中运行同步操作"""
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(self._executor, func, *args)
+
+ async def start_processing(self):
+ """启动邮件处理任务"""
+ if not self._processing:
+ self._processing = True
+ asyncio.create_task(self._process_email_queue())
+ logger.info("Email queue processing started")
+
+ async def stop_processing(self):
+ """停止邮件处理"""
+ self._processing = False
+ logger.info("Email queue processing stopped")
+
+ async def enqueue_email(self,
+ to_email: str,
+ subject: str,
+ content: str,
+ html_content: Optional[str] = None,
+ metadata: Optional[Dict[str, Any]] = None) -> str:
+ """
+ 将邮件加入队列等待发送
+
+ Args:
+ to_email: 收件人邮箱地址
+ subject: 邮件主题
+ content: 邮件纯文本内容
+ html_content: 邮件HTML内容(如果有)
+ metadata: 额外元数据(如密码重置ID等)
+
+ Returns:
+ 邮件任务ID
+ """
+ email_id = str(uuid.uuid4())
+
+ email_data = {
+ "id": email_id,
+ "to_email": to_email,
+ "subject": subject,
+ "content": content,
+ "html_content": html_content if html_content else "",
+ "metadata": json.dumps(metadata) if metadata else "{}",
+ "created_at": datetime.now().isoformat(),
+ "status": "pending", # pending, sending, sent, failed
+ "retry_count": "0"
+ }
+
+ # 将邮件数据存入Redis
+ await self._run_in_executor(
+ lambda: self.redis.hset(f"email:{email_id}", mapping=email_data)
+ )
+
+ # 设置24小时过期(防止数据堆积)
+ await self._run_in_executor(
+ self.redis.expire, f"email:{email_id}", 86400
+ )
+
+ # 加入发送队列
+ await self._run_in_executor(
+ self.redis.lpush, "email_queue", email_id
+ )
+
+ logger.info(f"Email enqueued with id: {email_id} to {to_email}")
+ return email_id
+
+ async def get_email_status(self, email_id: str) -> Dict[str, Any]:
+ """
+ 获取邮件发送状态
+
+ Args:
+ email_id: 邮件任务ID
+
+ Returns:
+ 邮件任务状态信息
+ """
+ email_data = await self._run_in_executor(
+ self.redis.hgetall, f"email:{email_id}"
+ )
+
+ # 解码Redis返回的字节数据
+ if email_data:
+ return {
+ k.decode("utf-8") if isinstance(k, bytes) else k:
+ v.decode("utf-8") if isinstance(v, bytes) else v
+ for k, v in email_data.items()
+ }
+
+ return {"status": "not_found"}
+
+ async def _process_email_queue(self):
+ """处理邮件队列"""
+ logger.info("Starting email queue processor")
+
+ while self._processing:
+ try:
+ # 从队列获取邮件ID
+ def brpop_operation():
+ return self.redis.brpop(["email_queue"], timeout=5)
+
+ result = await self._run_in_executor(brpop_operation)
+
+ if not result:
+ await asyncio.sleep(1)
+ continue
+
+ # 解包返回结果(列表名和值)
+ queue_name, email_id = result
+ if isinstance(email_id, bytes):
+ email_id = email_id.decode("utf-8")
+
+ # 获取邮件数据
+ email_data = await self.get_email_status(email_id)
+ if email_data.get("status") == "not_found":
+ logger.warning(f"Email data not found for id: {email_id}")
+ continue
+
+ # 更新状态为发送中
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "status", "sending"
+ )
+
+ # 尝试发送邮件
+ success = await self._send_email(email_data)
+
+ if success:
+ # 更新状态为已发送
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "status", "sent"
+ )
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "sent_at", datetime.now().isoformat()
+ )
+ logger.info(f"Email {email_id} sent successfully to {email_data.get('to_email')}")
+ else:
+ # 计算重试次数
+ retry_count = int(email_data.get("retry_count", "0")) + 1
+
+ if retry_count <= self._retry_limit:
+ # 重新入队,稍后重试
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "retry_count", str(retry_count)
+ )
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "status", "pending"
+ )
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "last_retry", datetime.now().isoformat()
+ )
+
+ # 延迟重试(使用指数退避)
+ delay = 60 * (2 ** (retry_count - 1)) # 1分钟,2分钟,4分钟...
+
+ # 创建延迟任务
+ asyncio.create_task(self._delayed_retry(email_id, delay))
+
+ logger.warning(f"Email {email_id} will be retried in {delay} seconds (attempt {retry_count})")
+ else:
+ # 超过重试次数,标记为失败
+ await self._run_in_executor(
+ self.redis.hset, f"email:{email_id}", "status", "failed"
+ )
+ logger.error(f"Email {email_id} failed after {retry_count} attempts")
+
+ except Exception as e:
+ logger.error(f"Error processing email queue: {e}")
+ await asyncio.sleep(5) # 出错后等待5秒
+
+ async def _delayed_retry(self, email_id: str, delay: int):
+ """延迟重试发送邮件"""
+ await asyncio.sleep(delay)
+ await self._run_in_executor(
+ self.redis.lpush, "email_queue", email_id
+ )
+ logger.info(f"Re-queued email {email_id} for retry after {delay} seconds")
+
+ async def _send_email(self, email_data: Dict[str, Any]) -> bool:
+ """
+ 实际发送邮件
+
+ Args:
+ email_data: 邮件数据
+
+ Returns:
+ 是否发送成功
+ """
+ try:
+ # 如果邮件发送功能被禁用,则只记录日志
+ if not getattr(settings, 'enable_email_sending', True):
+ logger.info(f"[Mock Email] Would send to {email_data.get('to_email')}: {email_data.get('subject')}")
+ return True
+
+ # 创建邮件
+ msg = MIMEMultipart('alternative')
+ msg['From'] = f"{self.from_name} <{self.from_email}>"
+ msg['To'] = email_data.get('to_email', '')
+ msg['Subject'] = email_data.get('subject', '')
+
+ # 添加纯文本内容
+ content = email_data.get('content', '')
+ if content:
+ msg.attach(MIMEText(content, 'plain', 'utf-8'))
+
+ # 添加HTML内容(如果有)
+ html_content = email_data.get('html_content', '')
+ if html_content:
+ msg.attach(MIMEText(html_content, 'html', 'utf-8'))
+
+ # 发送邮件
+ with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
+ if self.smtp_username and self.smtp_password:
+ server.starttls()
+ server.login(self.smtp_username, self.smtp_password)
+
+ server.send_message(msg)
+
+ return True
+
+ except Exception as e:
+ logger.error(f"Failed to send email: {e}")
+ return False
+
+
+# 全局邮件队列实例
+email_queue = EmailQueue()
+
+# 在应用启动时调用
+async def start_email_processor():
+ await email_queue.start_processing()
+
+# 在应用关闭时调用
+async def stop_email_processor():
+ await email_queue.stop_processing()
diff --git a/app/service/email_verification_service.py b/app/service/email_verification_service.py
index d513fbc..5207fa9 100644
--- a/app/service/email_verification_service.py
+++ b/app/service/email_verification_service.py
@@ -11,6 +11,7 @@ from typing import Optional
from app.database.email_verification import EmailVerification, LoginSession
from app.service.email_service import email_service
+from app.service.email_queue import email_queue # 导入邮件队列
from app.log import logger
from app.config import settings
@@ -27,6 +28,163 @@ class EmailVerificationService:
"""生成8位验证码"""
return ''.join(secrets.choice(string.digits) for _ in range(8))
+ @staticmethod
+ async def send_verification_email_via_queue(email: str, code: str, username: str, user_id: int) -> bool:
+ """使用邮件队列发送验证邮件
+
+ Args:
+ email: 接收验证码的邮箱地址
+ code: 验证码
+ username: 用户名
+ user_id: 用户ID
+
+ Returns:
+ 是否成功将邮件加入队列
+ """
+ try:
+ # HTML 邮件内容
+ html_content = f"""
+
+
+
+
+
+
+
+
+
+
+
+
你好 {username}!
+
请使用以下验证码验证您的账户:
+
+
{code}
+
+
验证码将在 10 分钟内有效。
+
+
+
重要提示:
+
+ - 请不要与任何人分享此验证码
+ - 如果您没有请求验证码,请忽略此邮件
+ - 为了账户安全,请勿在其他网站使用相同的密码
+
+
+
+
+
+
Hello {username}!
+
Please use the following verification code to verify your account:
+
+
This verification code will be valid for 10 minutes.
+
+
Important: Do not share this verification code with anyone. If you did not request this code, please ignore this email.
+
+
+
+
+
+
+ """
+
+ # 纯文本备用内容
+ plain_content = f"""
+你好 {username}!
+
+请使用以下验证码验证您的账户:
+
+{code}
+
+验证码将在10分钟内有效。
+
+重要提示:
+- 请不要与任何人分享此验证码
+- 如果您没有请求验证码,请忽略此邮件
+- 为了账户安全,请勿在其他网站使用相同的密码
+
+Hello {username}!
+Please use the following verification code to verify your account.
+This verification code will be valid for 10 minutes.
+
+© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。
+This email was sent automatically, please do not reply.
+"""
+
+ # 将邮件加入队列
+ subject = "邮箱验证 - Email Verification"
+ metadata = {
+ "type": "email_verification",
+ "user_id": user_id,
+ "code": code
+ }
+
+ await email_queue.enqueue_email(
+ to_email=email,
+ subject=subject,
+ content=plain_content,
+ html_content=html_content,
+ metadata=metadata
+ )
+
+ return True
+
+ except Exception as e:
+ logger.error(f"[Email Verification] Failed to enqueue email: {e}")
+ return False
+
@staticmethod
def generate_session_token() -> str:
"""生成会话令牌"""
@@ -106,14 +264,14 @@ class EmailVerificationService:
db, redis, user_id, email, ip_address, user_agent
)
- # 发送邮件
- success = await email_service.send_verification_email(email, code, username)
+ # 使用邮件队列发送验证邮件
+ success = await EmailVerificationService.send_verification_email_via_queue(email, code, username, user_id)
if success:
- logger.info(f"[Email Verification] Successfully sent verification email to {email} (user: {username})")
+ logger.info(f"[Email Verification] Successfully enqueued verification email to {email} (user: {username})")
return True
else:
- logger.error(f"[Email Verification] Failed to send verification email: {email} (user: {username})")
+ logger.error(f"[Email Verification] Failed to enqueue verification email: {email} (user: {username})")
return False
except Exception as e:
diff --git a/app/service/password_reset_service.py b/app/service/password_reset_service.py
index afe2865..193b897 100644
--- a/app/service/password_reset_service.py
+++ b/app/service/password_reset_service.py
@@ -14,6 +14,7 @@ from app.config import settings
from app.database import User
from app.dependencies.database import with_db
from app.service.email_service import EmailService
+from app.service.email_queue import email_queue # 导入邮件队列
from app.log import logger
from app.auth import get_password_hash, invalidate_user_tokens
@@ -134,17 +135,8 @@ class PasswordResetService:
return False, "服务暂时不可用,请稍后重试"
async def send_password_reset_email(self, email: str, code: str, username: str) -> bool:
- """发送密码重置邮件"""
+ """发送密码重置邮件(使用邮件队列)"""
try:
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- import smtplib
-
- msg = MIMEMultipart()
- msg['From'] = f"{self.email_service.from_name} <{self.email_service.from_email}>"
- msg['To'] = email
- msg['Subject'] = "密码重置 - Password Reset"
-
# HTML 邮件内容
html_content = f"""
@@ -254,26 +246,44 @@ class PasswordResetService: