添加邮件队列

This commit is contained in:
咕谷酱
2025-08-22 09:22:34 +08:00
parent 0f9238c501
commit e5dc11cf86
6 changed files with 680 additions and 29 deletions

View File

@@ -0,0 +1,26 @@
"""
密码重置相关数据库模型
"""
from __future__ import annotations
from datetime import datetime, UTC
from sqlmodel import SQLModel, Field
from sqlalchemy import Column, BigInteger, ForeignKey
class PasswordReset(SQLModel, table=True):
"""密码重置记录"""
__tablename__: str = "password_resets"
id: int | None = Field(default=None, primary_key=True)
user_id: int = Field(sa_column=Column(BigInteger, ForeignKey("lazer_users.id"), nullable=False, index=True))
email: str = Field(index=True)
reset_code: str = Field(max_length=8) # 8位重置验证码
created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
expires_at: datetime = Field() # 验证码过期时间
is_used: bool = Field(default=False) # 是否已使用
used_at: datetime | None = Field(default=None)
ip_address: str | None = Field(default=None) # 请求IP
user_agent: str | None = Field(default=None) # 用户代理

View File

@@ -0,0 +1,178 @@
"""
密码重置管理接口
"""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from redis.asyncio import Redis
from app.dependencies.database import get_redis
from app.service.password_reset_service import password_reset_service
from app.log import logger
router = APIRouter(prefix="/admin/password-reset", tags=["密码重置管理"])
@router.get(
"/status/{email}",
name="查询重置状态",
description="查询指定邮箱的密码重置状态"
)
async def get_password_reset_status(
email: str,
redis: Redis = Depends(get_redis),
):
"""查询密码重置状态"""
try:
info = await password_reset_service.get_reset_code_info(email, redis)
return JSONResponse(
status_code=200,
content={
"success": True,
"data": info
}
)
except Exception as e:
logger.error(f"[Admin] Failed to get password reset status for {email}: {e}")
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "获取状态失败"
}
)
@router.delete(
"/cleanup/{email}",
name="清理重置数据",
description="强制清理指定邮箱的密码重置数据"
)
async def force_cleanup_reset(
email: str,
redis: Redis = Depends(get_redis),
):
"""强制清理密码重置数据"""
try:
success = await password_reset_service.force_cleanup_user_reset(email, redis)
if success:
return JSONResponse(
status_code=200,
content={
"success": True,
"message": f"已清理邮箱 {email} 的重置数据"
}
)
else:
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "清理失败"
}
)
except Exception as e:
logger.error(f"[Admin] Failed to cleanup password reset for {email}: {e}")
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "清理操作失败"
}
)
@router.post(
"/cleanup/expired",
name="清理过期验证码",
description="清理所有过期的密码重置验证码"
)
async def cleanup_expired_codes(
redis: Redis = Depends(get_redis),
):
"""清理过期验证码"""
try:
count = await password_reset_service.cleanup_expired_codes(redis)
return JSONResponse(
status_code=200,
content={
"success": True,
"message": f"已清理 {count} 个过期的验证码",
"cleaned_count": count
}
)
except Exception as e:
logger.error(f"[Admin] Failed to cleanup expired codes: {e}")
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "清理操作失败"
}
)
@router.get(
"/stats",
name="重置统计",
description="获取密码重置的统计信息"
)
async def get_reset_statistics(
redis: Redis = Depends(get_redis),
):
"""获取重置统计信息"""
try:
# 获取所有重置相关的键
reset_keys = await redis.keys("password_reset:code:*")
rate_limit_keys = await redis.keys("password_reset:rate_limit:*")
active_resets = 0
used_resets = 0
active_rate_limits = 0
# 统计活跃重置
for key in reset_keys:
data_str = await redis.get(key)
if data_str:
try:
import json
data = json.loads(data_str)
if data.get("used", False):
used_resets += 1
else:
active_resets += 1
except:
pass
# 统计频率限制
for key in rate_limit_keys:
ttl = await redis.ttl(key)
if ttl > 0:
active_rate_limits += 1
stats = {
"total_reset_codes": len(reset_keys),
"active_resets": active_resets,
"used_resets": used_resets,
"active_rate_limits": active_rate_limits,
"total_rate_limit_keys": len(rate_limit_keys)
}
return JSONResponse(
status_code=200,
content={
"success": True,
"data": stats
}
)
except Exception as e:
logger.error(f"[Admin] Failed to get reset statistics: {e}")
return JSONResponse(
status_code=500,
content={
"success": False,
"error": "获取统计信息失败"
}
)

276
app/service/email_queue.py Normal file
View File

@@ -0,0 +1,276 @@
"""
邮件队列服务
用于异步发送邮件
"""
from __future__ import annotations
import asyncio
import concurrent.futures
from datetime import datetime
import json
import uuid
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Any, Optional
import redis as sync_redis # 添加同步Redis导入
from app.config import settings
from app.dependencies.database import redis_message_client # 使用同步Redis客户端
from app.log import logger
class EmailQueue:
"""Redis 邮件队列服务"""
def __init__(self):
# 创建专门用于邮件队列的同步Redis客户端 (db=0)
self.redis = sync_redis.from_url(settings.redis_url, decode_responses=True, db=0)
self._processing = False
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
self._retry_limit = 3 # 重试次数限制
# 邮件配置
self.smtp_server = getattr(settings, 'smtp_server', 'localhost')
self.smtp_port = getattr(settings, 'smtp_port', 587)
self.smtp_username = getattr(settings, 'smtp_username', '')
self.smtp_password = getattr(settings, 'smtp_password', '')
self.from_email = getattr(settings, 'from_email', 'noreply@example.com')
self.from_name = getattr(settings, 'from_name', 'osu! server')
async def _run_in_executor(self, func, *args):
"""在线程池中运行同步操作"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._executor, func, *args)
async def start_processing(self):
"""启动邮件处理任务"""
if not self._processing:
self._processing = True
asyncio.create_task(self._process_email_queue())
logger.info("Email queue processing started")
async def stop_processing(self):
"""停止邮件处理"""
self._processing = False
logger.info("Email queue processing stopped")
async def enqueue_email(self,
to_email: str,
subject: str,
content: str,
html_content: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> str:
"""
将邮件加入队列等待发送
Args:
to_email: 收件人邮箱地址
subject: 邮件主题
content: 邮件纯文本内容
html_content: 邮件HTML内容如果有
metadata: 额外元数据如密码重置ID等
Returns:
邮件任务ID
"""
email_id = str(uuid.uuid4())
email_data = {
"id": email_id,
"to_email": to_email,
"subject": subject,
"content": content,
"html_content": html_content if html_content else "",
"metadata": json.dumps(metadata) if metadata else "{}",
"created_at": datetime.now().isoformat(),
"status": "pending", # pending, sending, sent, failed
"retry_count": "0"
}
# 将邮件数据存入Redis
await self._run_in_executor(
lambda: self.redis.hset(f"email:{email_id}", mapping=email_data)
)
# 设置24小时过期防止数据堆积
await self._run_in_executor(
self.redis.expire, f"email:{email_id}", 86400
)
# 加入发送队列
await self._run_in_executor(
self.redis.lpush, "email_queue", email_id
)
logger.info(f"Email enqueued with id: {email_id} to {to_email}")
return email_id
async def get_email_status(self, email_id: str) -> Dict[str, Any]:
"""
获取邮件发送状态
Args:
email_id: 邮件任务ID
Returns:
邮件任务状态信息
"""
email_data = await self._run_in_executor(
self.redis.hgetall, f"email:{email_id}"
)
# 解码Redis返回的字节数据
if email_data:
return {
k.decode("utf-8") if isinstance(k, bytes) else k:
v.decode("utf-8") if isinstance(v, bytes) else v
for k, v in email_data.items()
}
return {"status": "not_found"}
async def _process_email_queue(self):
"""处理邮件队列"""
logger.info("Starting email queue processor")
while self._processing:
try:
# 从队列获取邮件ID
def brpop_operation():
return self.redis.brpop(["email_queue"], timeout=5)
result = await self._run_in_executor(brpop_operation)
if not result:
await asyncio.sleep(1)
continue
# 解包返回结果(列表名和值)
queue_name, email_id = result
if isinstance(email_id, bytes):
email_id = email_id.decode("utf-8")
# 获取邮件数据
email_data = await self.get_email_status(email_id)
if email_data.get("status") == "not_found":
logger.warning(f"Email data not found for id: {email_id}")
continue
# 更新状态为发送中
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "status", "sending"
)
# 尝试发送邮件
success = await self._send_email(email_data)
if success:
# 更新状态为已发送
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "status", "sent"
)
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "sent_at", datetime.now().isoformat()
)
logger.info(f"Email {email_id} sent successfully to {email_data.get('to_email')}")
else:
# 计算重试次数
retry_count = int(email_data.get("retry_count", "0")) + 1
if retry_count <= self._retry_limit:
# 重新入队,稍后重试
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "retry_count", str(retry_count)
)
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "status", "pending"
)
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "last_retry", datetime.now().isoformat()
)
# 延迟重试(使用指数退避)
delay = 60 * (2 ** (retry_count - 1)) # 1分钟2分钟4分钟...
# 创建延迟任务
asyncio.create_task(self._delayed_retry(email_id, delay))
logger.warning(f"Email {email_id} will be retried in {delay} seconds (attempt {retry_count})")
else:
# 超过重试次数,标记为失败
await self._run_in_executor(
self.redis.hset, f"email:{email_id}", "status", "failed"
)
logger.error(f"Email {email_id} failed after {retry_count} attempts")
except Exception as e:
logger.error(f"Error processing email queue: {e}")
await asyncio.sleep(5) # 出错后等待5秒
async def _delayed_retry(self, email_id: str, delay: int):
"""延迟重试发送邮件"""
await asyncio.sleep(delay)
await self._run_in_executor(
self.redis.lpush, "email_queue", email_id
)
logger.info(f"Re-queued email {email_id} for retry after {delay} seconds")
async def _send_email(self, email_data: Dict[str, Any]) -> bool:
"""
实际发送邮件
Args:
email_data: 邮件数据
Returns:
是否发送成功
"""
try:
# 如果邮件发送功能被禁用,则只记录日志
if not getattr(settings, 'enable_email_sending', True):
logger.info(f"[Mock Email] Would send to {email_data.get('to_email')}: {email_data.get('subject')}")
return True
# 创建邮件
msg = MIMEMultipart('alternative')
msg['From'] = f"{self.from_name} <{self.from_email}>"
msg['To'] = email_data.get('to_email', '')
msg['Subject'] = email_data.get('subject', '')
# 添加纯文本内容
content = email_data.get('content', '')
if content:
msg.attach(MIMEText(content, 'plain', 'utf-8'))
# 添加HTML内容如果有
html_content = email_data.get('html_content', '')
if html_content:
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# 发送邮件
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
if self.smtp_username and self.smtp_password:
server.starttls()
server.login(self.smtp_username, self.smtp_password)
server.send_message(msg)
return True
except Exception as e:
logger.error(f"Failed to send email: {e}")
return False
# 全局邮件队列实例
email_queue = EmailQueue()
# 在应用启动时调用
async def start_email_processor():
await email_queue.start_processing()
# 在应用关闭时调用
async def stop_email_processor():
await email_queue.stop_processing()

View File

@@ -11,6 +11,7 @@ from typing import Optional
from app.database.email_verification import EmailVerification, LoginSession
from app.service.email_service import email_service
from app.service.email_queue import email_queue # 导入邮件队列
from app.log import logger
from app.config import settings
@@ -27,6 +28,163 @@ class EmailVerificationService:
"""生成8位验证码"""
return ''.join(secrets.choice(string.digits) for _ in range(8))
@staticmethod
async def send_verification_email_via_queue(email: str, code: str, username: str, user_id: int) -> bool:
"""使用邮件队列发送验证邮件
Args:
email: 接收验证码的邮箱地址
code: 验证码
username: 用户名
user_id: 用户ID
Returns:
是否成功将邮件加入队列
"""
try:
# HTML 邮件内容
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
.container {{
max-width: 600px;
margin: 0 auto;
font-family: Arial, sans-serif;
line-height: 1.6;
}}
.header {{
background: linear-gradient(135deg, #ff66aa, #ff9966);
color: white;
padding: 20px;
text-align: center;
border-radius: 10px 10px 0 0;
}}
.content {{
background: #f9f9f9;
padding: 30px;
border: 1px solid #ddd;
}}
.code {{
background: #fff;
border: 2px solid #ff66aa;
border-radius: 8px;
padding: 15px;
text-align: center;
font-size: 24px;
font-weight: bold;
letter-spacing: 3px;
margin: 20px 0;
color: #333;
}}
.footer {{
background: #333;
color: #fff;
padding: 15px;
text-align: center;
border-radius: 0 0 10px 10px;
font-size: 12px;
}}
.warning {{
background: #fff3cd;
border: 1px solid #ffeaa7;
border-radius: 5px;
padding: 10px;
margin: 15px 0;
color: #856404;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>osu! 邮箱验证</h1>
<p>Email Verification</p>
</div>
<div class="content">
<h2>你好 {username}</h2>
<p>请使用以下验证码验证您的账户:</p>
<div class="code">{code}</div>
<p>验证码将在 <strong>10 分钟内有效</strong>。</p>
<div class="warning">
<p><strong>重要提示:</strong></p>
<ul>
<li>请不要与任何人分享此验证码</li>
<li>如果您没有请求验证码,请忽略此邮件</li>
<li>为了账户安全,请勿在其他网站使用相同的密码</li>
</ul>
</div>
<hr style="border: none; border-top: 1px solid #ddd; margin: 20px 0;">
<h3>Hello {username}!</h3>
<p>Please use the following verification code to verify your account:</p>
<p>This verification code will be valid for <strong>10 minutes</strong>.</p>
<p><strong>Important:</strong> Do not share this verification code with anyone. If you did not request this code, please ignore this email.</p>
</div>
<div class="footer">
<p>© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。</p>
<p>This email was sent automatically, please do not reply.</p>
</div>
</div>
</body>
</html>
"""
# 纯文本备用内容
plain_content = f"""
你好 {username}
请使用以下验证码验证您的账户:
{code}
验证码将在10分钟内有效。
重要提示:
- 请不要与任何人分享此验证码
- 如果您没有请求验证码,请忽略此邮件
- 为了账户安全,请勿在其他网站使用相同的密码
Hello {username}!
Please use the following verification code to verify your account.
This verification code will be valid for 10 minutes.
© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。
This email was sent automatically, please do not reply.
"""
# 将邮件加入队列
subject = "邮箱验证 - Email Verification"
metadata = {
"type": "email_verification",
"user_id": user_id,
"code": code
}
await email_queue.enqueue_email(
to_email=email,
subject=subject,
content=plain_content,
html_content=html_content,
metadata=metadata
)
return True
except Exception as e:
logger.error(f"[Email Verification] Failed to enqueue email: {e}")
return False
@staticmethod
def generate_session_token() -> str:
"""生成会话令牌"""
@@ -106,14 +264,14 @@ class EmailVerificationService:
db, redis, user_id, email, ip_address, user_agent
)
# 发送邮件
success = await email_service.send_verification_email(email, code, username)
# 使用邮件队列发送验证邮件
success = await EmailVerificationService.send_verification_email_via_queue(email, code, username, user_id)
if success:
logger.info(f"[Email Verification] Successfully sent verification email to {email} (user: {username})")
logger.info(f"[Email Verification] Successfully enqueued verification email to {email} (user: {username})")
return True
else:
logger.error(f"[Email Verification] Failed to send verification email: {email} (user: {username})")
logger.error(f"[Email Verification] Failed to enqueue verification email: {email} (user: {username})")
return False
except Exception as e:

View File

@@ -14,6 +14,7 @@ from app.config import settings
from app.database import User
from app.dependencies.database import with_db
from app.service.email_service import EmailService
from app.service.email_queue import email_queue # 导入邮件队列
from app.log import logger
from app.auth import get_password_hash, invalidate_user_tokens
@@ -134,17 +135,8 @@ class PasswordResetService:
return False, "服务暂时不可用,请稍后重试"
async def send_password_reset_email(self, email: str, code: str, username: str) -> bool:
"""发送密码重置邮件"""
"""发送密码重置邮件(使用邮件队列)"""
try:
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
msg = MIMEMultipart()
msg['From'] = f"{self.email_service.from_name} <{self.email_service.from_email}>"
msg['To'] = email
msg['Subject'] = "密码重置 - Password Reset"
# HTML 邮件内容
html_content = f"""
<!DOCTYPE html>
@@ -254,26 +246,44 @@ class PasswordResetService:
</html>
"""
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# 纯文本内容(作为备用)
plain_content = f"""
你好 {username}
# 发送邮件
if not getattr(settings, 'enable_email_sending', True):
# 邮件发送功能禁用时只记录日志,不实际发送
logger.info(f"[Password Reset] Mock sending reset code to {email}: {code}")
return True
我们收到了您的密码重置请求。如果这是您本人操作,请使用以下验证码重置密码:
with smtplib.SMTP(self.email_service.smtp_server, self.email_service.smtp_port) as server:
if self.email_service.smtp_username and self.email_service.smtp_password:
server.starttls()
server.login(self.email_service.smtp_username, self.email_service.smtp_password)
{code}
server.send_message(msg)
这个验证码将在10分钟后过期。
logger.info(f"[Password Reset] Successfully sent reset code to {email}")
安全提醒:
- 请不要与任何人分享这个验证码
- 如果您没有请求密码重置,请立即忽略这封邮件
- 验证码只能使用一次
- 建议设置一个强密码以保护您的账户安全
如果您有任何问题,请联系我们的支持团队。
© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。
"""
# 添加邮件到队列
subject = "密码重置 - Password Reset"
metadata = {"type": "password_reset", "email": email, "code": code}
await email_queue.enqueue_email(
to_email=email,
subject=subject,
content=plain_content,
html_content=html_content,
metadata=metadata
)
logger.info(f"[Password Reset] Enqueued reset code email to {email}")
return True
except Exception as e:
logger.error(f"[Password Reset] Failed to send email: {e}")
logger.error(f"[Password Reset] Failed to enqueue email: {e}")
return False
async def reset_password(

View File

@@ -28,6 +28,7 @@ from app.service.beatmap_download_service import download_service
from app.service.calculate_all_user_rank import calculate_user_rank
from app.service.create_banchobot import create_banchobot
from app.service.daily_challenge import daily_challenge_job, process_daily_challenge_top
from app.service.email_queue import start_email_processor, stop_email_processor
from app.service.geoip_scheduler import schedule_geoip_updates
from app.service.init_geoip import init_geoip
from app.service.load_achievements import load_achievements
@@ -78,6 +79,7 @@ async def lifespan(app: FastAPI):
await daily_challenge_job()
await process_daily_challenge_top()
await create_banchobot()
await start_email_processor() # 启动邮件队列处理器
await download_service.start_health_check() # 启动下载服务健康检查
await start_cache_scheduler() # 启动缓存调度器
await start_database_cleanup_scheduler() # 启动数据库清理调度器
@@ -92,6 +94,7 @@ async def lifespan(app: FastAPI):
await stop_cache_scheduler() # 停止缓存调度器
await stop_database_cleanup_scheduler() # 停止数据库清理调度器
await download_service.stop_health_check() # 停止下载服务健康检查
await stop_email_processor() # 停止邮件队列处理器
await engine.dispose()
await redis_client.aclose()