添加密码重置

This commit is contained in:
咕谷酱
2025-08-22 08:56:40 +08:00
parent 3bee2421fa
commit 0f9238c501
7 changed files with 727 additions and 7 deletions

View File

@@ -176,6 +176,27 @@ def generate_refresh_token() -> str:
return "".join(secrets.choice(characters) for _ in range(length)) return "".join(secrets.choice(characters) for _ in range(length))
async def invalidate_user_tokens(db: AsyncSession, user_id: int) -> int:
"""使指定用户的所有令牌失效
返回删除的令牌数量
"""
# 使用 select 先获取所有令牌
stmt = select(OAuthToken).where(OAuthToken.user_id == user_id)
result = await db.exec(stmt)
tokens = result.all()
# 逐个删除令牌
count = 0
for token in tokens:
await db.delete(token)
count += 1
# 提交更改
await db.commit()
return count
def verify_token(token: str) -> dict | None: def verify_token(token: str) -> dict | None:
"""验证访问令牌""" """验证访问令牌"""
try: try:

View File

@@ -35,6 +35,7 @@ from app.service.email_verification_service import (
EmailVerificationService, EmailVerificationService,
LoginSessionService LoginSessionService
) )
from app.service.password_reset_service import password_reset_service
from fastapi import APIRouter, Depends, Form, Request from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@@ -561,3 +562,95 @@ async def oauth_token(
refresh_token=refresh_token_str, refresh_token=refresh_token_str,
scope=" ".join(scopes), scope=" ".join(scopes),
) )
@router.post(
"/password-reset/request",
name="请求密码重置",
description="通过邮箱请求密码重置验证码"
)
async def request_password_reset(
request: Request,
email: str = Form(..., description="邮箱地址"),
redis: Redis = Depends(get_redis),
):
"""
请求密码重置
"""
from app.dependencies.geoip import get_client_ip
# 获取客户端信息
ip_address = get_client_ip(request)
user_agent = request.headers.get("User-Agent", "")
# 请求密码重置
success, message = await password_reset_service.request_password_reset(
email=email.lower().strip(),
ip_address=ip_address,
user_agent=user_agent,
redis=redis
)
if success:
return JSONResponse(
status_code=200,
content={
"success": True,
"message": message
}
)
else:
return JSONResponse(
status_code=400,
content={
"success": False,
"error": message
}
)
@router.post(
"/password-reset/reset",
name="重置密码",
description="使用验证码重置密码"
)
async def reset_password(
request: Request,
email: str = Form(..., description="邮箱地址"),
reset_code: str = Form(..., description="重置验证码"),
new_password: str = Form(..., description="新密码"),
redis: Redis = Depends(get_redis),
):
"""
重置密码
"""
from app.dependencies.geoip import get_client_ip
# 获取客户端信息
ip_address = get_client_ip(request)
# 重置密码
success, message = await password_reset_service.reset_password(
email=email.lower().strip(),
reset_code=reset_code.strip(),
new_password=new_password,
ip_address=ip_address,
redis=redis
)
if success:
return JSONResponse(
status_code=200,
content={
"success": True,
"message": message
}
)
else:
return JSONResponse(
status_code=400,
content={
"success": False,
"error": message
}
)

View File

@@ -98,7 +98,7 @@ class EmailService:
<body> <body>
<div class="container"> <div class="container">
<div class="header"> <div class="header">
<h1> osu! 邮箱验证</h1> <h1>osu! 邮箱验证</h1>
<p>Email Verification</p> <p>Email Verification</p>
</div> </div>

View File

@@ -480,14 +480,19 @@ class EnhancedIntervalStatsManager:
keys = await redis_async.keys(pattern) keys = await redis_async.keys(pattern)
for key in keys: for key in keys:
try: try:
# 从key中提取时间 # 从key中提取时间,处理字节或字符串类型
time_part = key.decode().split(":")[-1] # YYYYMMDD_HHMM格式 if isinstance(key, bytes):
key_str = key.decode()
else:
key_str = key
time_part = key_str.split(":")[-1] # YYYYMMDD_HHMM格式
key_time = datetime.strptime(time_part, "%Y%m%d_%H%M") key_time = datetime.strptime(time_part, "%Y%m%d_%H%M")
if key_time < cutoff_time: if key_time < cutoff_time:
await redis_async.delete(key) await redis_async.delete(key)
# 也删除对应的用户集合 # 也删除对应的用户集合
await redis_async.delete(f"{INTERVAL_ONLINE_USERS_KEY}:{key}") # 使用key_str确保正确拼接用户集合键
await redis_async.delete(f"{INTERVAL_ONLINE_USERS_KEY}:{key_str}")
await redis_async.delete(f"{INTERVAL_PLAYING_USERS_KEY}:{key}") await redis_async.delete(f"{INTERVAL_PLAYING_USERS_KEY}:{key}")
except (ValueError, IndexError): except (ValueError, IndexError):

View File

@@ -0,0 +1,429 @@
"""
密码重置服务
"""
from __future__ import annotations
import secrets
import string
from datetime import datetime, UTC, timedelta
from typing import Optional, Tuple
import json
from app.config import settings
from app.database import User
from app.dependencies.database import with_db
from app.service.email_service import EmailService
from app.log import logger
from app.auth import get_password_hash, invalidate_user_tokens
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from redis.asyncio import Redis
class PasswordResetService:
"""密码重置服务 - 使用Redis管理验证码"""
# Redis键前缀
RESET_CODE_PREFIX = "password_reset:code:" # 存储验证码
RESET_RATE_LIMIT_PREFIX = "password_reset:rate_limit:" # 限制请求频率
def __init__(self):
self.email_service = EmailService()
def generate_reset_code(self) -> str:
"""生成8位重置验证码"""
return ''.join(secrets.choice(string.digits) for _ in range(8))
def _get_reset_code_key(self, email: str) -> str:
"""获取验证码Redis键"""
return f"{self.RESET_CODE_PREFIX}{email.lower()}"
def _get_rate_limit_key(self, email: str) -> str:
"""获取频率限制Redis键"""
return f"{self.RESET_RATE_LIMIT_PREFIX}{email.lower()}"
async def request_password_reset(
self,
email: str,
ip_address: str,
user_agent: str,
redis: Redis
) -> Tuple[bool, str]:
"""
请求密码重置
Args:
email: 邮箱地址
ip_address: 请求IP
user_agent: 用户代理
redis: Redis连接
Returns:
Tuple[success, message]
"""
email = email.lower().strip()
async with with_db() as session:
# 查找用户
user_query = select(User).where(User.email == email)
user_result = await session.exec(user_query)
user = user_result.first()
if not user:
# 为了安全考虑,不告诉用户邮箱不存在,但仍然要检查频率限制
rate_limit_key = self._get_rate_limit_key(email)
if await redis.get(rate_limit_key):
return False, "请求过于频繁,请稍后再试"
# 设置一个假的频率限制,防止恶意用户探测邮箱
await redis.setex(rate_limit_key, 60, "1")
return True, "如果该邮箱地址存在,您将收到密码重置邮件"
# 检查频率限制
rate_limit_key = self._get_rate_limit_key(email)
if await redis.get(rate_limit_key):
return False, "请求过于频繁,请稍后再试"
# 生成重置验证码
reset_code = self.generate_reset_code()
# 存储验证码信息到Redis
reset_code_key = self._get_reset_code_key(email)
reset_data = {
"user_id": user.id,
"email": email,
"reset_code": reset_code,
"created_at": datetime.now(UTC).isoformat(),
"ip_address": ip_address,
"user_agent": user_agent,
"used": False
}
try:
# 先设置频率限制
await redis.setex(rate_limit_key, 60, "1")
# 存储验证码10分钟过期
await redis.setex(reset_code_key, 600, json.dumps(reset_data))
# 发送重置邮件
email_sent = await self.send_password_reset_email(
email=email,
code=reset_code,
username=user.username
)
if email_sent:
logger.info(f"[Password Reset] Sent reset code to user {user.id} ({email})")
return True, "密码重置邮件已发送,请查收邮箱"
else:
# 邮件发送失败清理Redis中的数据
await redis.delete(reset_code_key)
await redis.delete(rate_limit_key)
logger.warning(f"[Password Reset] Email sending failed, cleaned up Redis data for {email}")
return False, "邮件发送失败,请稍后重试"
except Exception as e:
# Redis操作失败清理可能的部分数据
try:
await redis.delete(reset_code_key)
await redis.delete(rate_limit_key)
except:
pass
logger.error(f"[Password Reset] Redis operation failed: {e}")
return False, "服务暂时不可用,请稍后重试"
async def send_password_reset_email(self, email: str, code: str, username: str) -> bool:
"""发送密码重置邮件"""
try:
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
msg = MIMEMultipart()
msg['From'] = f"{self.email_service.from_name} <{self.email_service.from_email}>"
msg['To'] = email
msg['Subject'] = "密码重置 - Password Reset"
# HTML 邮件内容
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
.container {{
max-width: 600px;
margin: 0 auto;
font-family: Arial, sans-serif;
line-height: 1.6;
}}
.header {{
background: linear-gradient(135deg, #ff6b6b, #ee5a24);
color: white;
padding: 20px;
text-align: center;
border-radius: 10px 10px 0 0;
}}
.content {{
background: #f9f9f9;
padding: 30px;
border: 1px solid #ddd;
}}
.code {{
background: #fff;
border: 2px solid #ff6b6b;
border-radius: 8px;
padding: 15px;
text-align: center;
font-size: 24px;
font-weight: bold;
letter-spacing: 3px;
margin: 20px 0;
color: #333;
}}
.footer {{
background: #333;
color: #fff;
padding: 15px;
text-align: center;
border-radius: 0 0 10px 10px;
font-size: 12px;
}}
.warning {{
background: #fff3cd;
border: 1px solid #ffeaa7;
border-radius: 5px;
padding: 10px;
margin: 15px 0;
color: #856404;
}}
.danger {{
background: #f8d7da;
border: 1px solid #f5c6cb;
border-radius: 5px;
padding: 10px;
margin: 15px 0;
color: #721c24;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>osu! 密码重置</h1>
<p>Password Reset Request</p>
</div>
<div class="content">
<h2>你好 {username}</h2>
<p>我们收到了您的密码重置请求。如果这是您本人操作,请使用以下验证码重置密码:</p>
<div class="code">{code}</div>
<p>这个验证码将在 <strong>10 分钟后过期</strong>。</p>
<div class="danger">
<strong>⚠️ 安全提醒:</strong>
<ul>
<li>请不要与任何人分享这个验证码</li>
<li>如果您没有请求密码重置,请立即忽略这封邮件</li>
<li>验证码只能使用一次</li>
<li>建议设置一个强密码以保护您的账户安全</li>
</ul>
</div>
<p>如果您有任何问题,请联系我们的支持团队。</p>
<hr style="border: none; border-top: 1px solid #ddd; margin: 20px 0;">
<h3>Hello {username}!</h3>
<p>We received a request to reset your password. If this was you, please use the following verification code to reset your password:</p>
<p>This verification code will expire in <strong>10 minutes</strong>.</p>
<p><strong>Security Notice:</strong> Do not share this verification code with anyone. If you did not request a password reset, please ignore this email.</p>
</div>
<div class="footer">
<p>© 2025 g0v0! Private Server. 此邮件由系统自动发送,请勿回复。</p>
<p>This email was sent automatically, please do not reply.</p>
</div>
</div>
</body>
</html>
"""
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# 发送邮件
if not getattr(settings, 'enable_email_sending', True):
# 邮件发送功能禁用时只记录日志,不实际发送
logger.info(f"[Password Reset] Mock sending reset code to {email}: {code}")
return True
with smtplib.SMTP(self.email_service.smtp_server, self.email_service.smtp_port) as server:
if self.email_service.smtp_username and self.email_service.smtp_password:
server.starttls()
server.login(self.email_service.smtp_username, self.email_service.smtp_password)
server.send_message(msg)
logger.info(f"[Password Reset] Successfully sent reset code to {email}")
return True
except Exception as e:
logger.error(f"[Password Reset] Failed to send email: {e}")
return False
async def reset_password(
self,
email: str,
reset_code: str,
new_password: str,
ip_address: str,
redis: Redis
) -> Tuple[bool, str]:
"""
重置密码
Args:
email: 邮箱地址
reset_code: 重置验证码
new_password: 新密码
ip_address: 请求IP
redis: Redis连接
Returns:
Tuple[success, message]
"""
email = email.lower().strip()
reset_code = reset_code.strip()
async with with_db() as session:
# 从Redis获取验证码数据
reset_code_key = self._get_reset_code_key(email)
reset_data_str = await redis.get(reset_code_key)
if not reset_data_str:
return False, "验证码无效或已过期"
try:
reset_data = json.loads(reset_data_str)
except json.JSONDecodeError:
return False, "验证码数据格式错误"
# 验证验证码
if reset_data.get("reset_code") != reset_code:
return False, "验证码错误"
# 检查是否已使用
if reset_data.get("used", False):
return False, "验证码已使用"
# 验证邮箱匹配
if reset_data.get("email") != email:
return False, "邮箱地址不匹配"
# 查找用户
user_query = select(User).where(User.email == email)
user_result = await session.exec(user_query)
user = user_result.first()
if not user:
return False, "用户不存在"
if user.id is None:
return False, "用户ID无效"
# 验证用户ID匹配
if reset_data.get("user_id") != user.id:
return False, "用户信息不匹配"
# 密码强度检查
if len(new_password) < 6:
return False, "密码长度至少为6位"
try:
# 先标记验证码为已使用(在数据库操作之前)
reset_data["used"] = True
reset_data["used_at"] = datetime.now(UTC).isoformat()
# 保存用户ID用于日志记录
user_id = user.id
# 更新用户密码
password_hash = get_password_hash(new_password)
user.pw_bcrypt = password_hash # 使用正确的字段名称 pw_bcrypt 而不是 password_hash
# 提交数据库更改
await session.commit()
# 使该用户的所有现有令牌失效(使其他客户端登录失效)
tokens_deleted = await invalidate_user_tokens(session, user_id)
# 数据库操作成功后更新Redis状态
await redis.setex(reset_code_key, 300, json.dumps(reset_data)) # 保留5分钟用于日志记录
logger.info(f"[Password Reset] User {user_id} ({email}) successfully reset password from IP {ip_address}, invalidated {tokens_deleted} tokens")
return True, "密码重置成功,所有设备已被登出"
except Exception as e:
# 不要在异常处理中访问user.id可能触发数据库操作
user_id = reset_data.get("user_id", "未知")
logger.error(f"[Password Reset] Failed to reset password for user {user_id}: {e}")
await session.rollback()
# 数据库回滚时需要恢复Redis中的验证码状态
try:
# 恢复验证码为未使用状态
original_reset_data = {
"user_id": reset_data.get("user_id"),
"email": reset_data.get("email"),
"reset_code": reset_data.get("reset_code"),
"created_at": reset_data.get("created_at"),
"ip_address": reset_data.get("ip_address"),
"user_agent": reset_data.get("user_agent"),
"used": False # 恢复为未使用状态
}
# 计算剩余的TTL时间
created_at = datetime.fromisoformat(reset_data.get("created_at", ""))
elapsed = (datetime.now(UTC) - created_at).total_seconds()
remaining_ttl = max(0, 600 - int(elapsed)) # 600秒总过期时间
if remaining_ttl > 0:
await redis.setex(reset_code_key, remaining_ttl, json.dumps(original_reset_data))
logger.info(f"[Password Reset] Restored Redis state after database rollback for {email}")
else:
# 如果已经过期,直接删除
await redis.delete(reset_code_key)
logger.info(f"[Password Reset] Removed expired reset code after database rollback for {email}")
except Exception as redis_error:
logger.error(f"[Password Reset] Failed to restore Redis state after rollback: {redis_error}")
return False, "密码重置失败,请稍后重试"
async def get_reset_attempts_count(self, email: str, redis: Redis) -> int:
"""
获取邮箱的重置尝试次数(通过检查频率限制键)
Args:
email: 邮箱地址
redis: Redis连接
Returns:
尝试次数
"""
try:
rate_limit_key = self._get_rate_limit_key(email)
ttl = await redis.ttl(rate_limit_key)
return 1 if ttl > 0 else 0
except Exception as e:
logger.error(f"[Password Reset] Failed to get attempts count: {e}")
return 0
# 全局密码重置服务实例
password_reset_service = PasswordResetService()

View File

@@ -225,7 +225,7 @@ class RankingCacheService:
) -> None: ) -> None:
"""刷新排行榜缓存""" """刷新排行榜缓存"""
if self._refreshing: if self._refreshing:
logger.info( logger.debug(
f"Ranking cache refresh already in progress for {ruleset}:{type}" f"Ranking cache refresh already in progress for {ruleset}:{type}"
) )
return return
@@ -308,7 +308,7 @@ class RankingCacheService:
except Exception as e: except Exception as e:
logger.error(f"Error caching page {page} for {ruleset}:{type}: {e}") logger.error(f"Error caching page {page} for {ruleset}:{type}: {e}")
logger.info(f"Completed ranking cache refresh for {ruleset}:{type}") logger.debug(f"Completed ranking cache refresh for {ruleset}:{type}")
except Exception as e: except Exception as e:
logger.error(f"Ranking cache refresh failed for {ruleset}:{type}: {e}") logger.error(f"Ranking cache refresh failed for {ruleset}:{type}: {e}")
@@ -323,7 +323,7 @@ class RankingCacheService:
) -> None: ) -> None:
"""刷新地区排行榜缓存""" """刷新地区排行榜缓存"""
if self._refreshing: if self._refreshing:
logger.info( logger.debug(
f"Country ranking cache refresh already in progress for {ruleset}" f"Country ranking cache refresh already in progress for {ruleset}"
) )
return return

View File

@@ -0,0 +1,172 @@
"""add_password_reset_table
Revision ID: d103d442dc24
Revises: 0f96348cdfd2
Create Date: 2025-08-22 08:27:58.468119
"""
from __future__ import annotations
from collections.abc import Sequence
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
import sqlmodel
# revision identifiers, used by Alembic.
revision: str = "d103d442dc24"
down_revision: str | Sequence[str] | None = "0f96348cdfd2"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table("password_resets",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("user_id", sa.BigInteger(), nullable=False),
sa.Column("email", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("reset_code", sqlmodel.sql.sqltypes.AutoString(length=8), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("expires_at", sa.DateTime(), nullable=False),
sa.Column("is_used", sa.Boolean(), nullable=False),
sa.Column("used_at", sa.DateTime(), nullable=True),
sa.Column("ip_address", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("user_agent", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.ForeignKeyConstraint(["user_id"], ["lazer_users.id"], ),
sa.PrimaryKeyConstraint("id")
)
op.create_index(op.f("ix_password_resets_email"), "password_resets", ["email"], unique=False)
op.create_index(op.f("ix_password_resets_user_id"), "password_resets", ["user_id"], unique=False)
op.drop_index(op.f("ix_two_factor_auth_id"), table_name="two_factor_auth")
op.drop_index(op.f("ix_two_factor_auth_user_id"), table_name="two_factor_auth")
op.drop_table("two_factor_auth")
op.drop_index(op.f("ix_user_ip_history_id"), table_name="user_ip_history")
op.drop_index(op.f("ix_user_ip_history_ip_address"), table_name="user_ip_history")
op.drop_index(op.f("ix_user_ip_history_user_id"), table_name="user_ip_history")
op.drop_table("user_ip_history")
op.drop_index(op.f("ix_session_verification_user_id"), table_name="session_verification")
op.drop_table("session_verification")
op.alter_column("beatmapsets", "nsfw",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("beatmapsets", "spotlight",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("beatmapsets", "video",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("beatmapsets", "can_be_hyped",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("beatmapsets", "discussion_locked",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("beatmapsets", "storyboard",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("beatmapsets", "download_disabled",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.drop_index(op.f("uq_user_achievement"), table_name="lazer_user_achievements")
op.alter_column("scores", "has_replay",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("scores", "passed",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column("scores", "preserve",
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("scores", "preserve",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("scores", "passed",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("scores", "has_replay",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.create_index(op.f("uq_user_achievement"), "lazer_user_achievements", ["user_id", "achievement_id"], unique=True)
op.alter_column("beatmapsets", "download_disabled",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("beatmapsets", "storyboard",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("beatmapsets", "discussion_locked",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("beatmapsets", "can_be_hyped",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("beatmapsets", "video",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("beatmapsets", "spotlight",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column("beatmapsets", "nsfw",
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.create_table("session_verification",
sa.Column("id", mysql.BIGINT(), autoincrement=True, nullable=False),
sa.Column("user_id", mysql.BIGINT(), autoincrement=False, nullable=False),
sa.Column("session_id", mysql.VARCHAR(length=255), nullable=False),
sa.Column("ip_address", mysql.VARCHAR(length=45), nullable=False),
sa.Column("is_verified", mysql.TINYINT(display_width=1), autoincrement=False, nullable=False),
sa.Column("verified_at", mysql.DATETIME(), nullable=True),
sa.Column("expires_at", mysql.DATETIME(), nullable=False),
sa.ForeignKeyConstraint(["user_id"], ["lazer_users.id"], name=op.f("session_verification_ibfk_1")),
sa.PrimaryKeyConstraint("id"),
mysql_collate="utf8mb4_0900_ai_ci",
mysql_default_charset="utf8mb4",
mysql_engine="InnoDB"
)
op.create_index(op.f("ix_session_verification_user_id"), "session_verification", ["user_id"], unique=False)
op.create_table("user_ip_history",
sa.Column("id", mysql.BIGINT(), autoincrement=True, nullable=False),
sa.Column("user_id", mysql.BIGINT(), autoincrement=False, nullable=False),
sa.Column("ip_address", mysql.VARCHAR(length=45), nullable=False),
sa.Column("first_seen", mysql.DATETIME(), nullable=False),
sa.Column("last_seen", mysql.DATETIME(), nullable=False),
sa.Column("usage_count", mysql.INTEGER(), autoincrement=False, nullable=False),
sa.Column("is_trusted", mysql.TINYINT(display_width=1), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(["user_id"], ["lazer_users.id"], name=op.f("user_ip_history_ibfk_1")),
sa.PrimaryKeyConstraint("id"),
mysql_collate="utf8mb4_0900_ai_ci",
mysql_default_charset="utf8mb4",
mysql_engine="InnoDB"
)
op.create_index(op.f("ix_user_ip_history_user_id"), "user_ip_history", ["user_id"], unique=False)
op.create_index(op.f("ix_user_ip_history_ip_address"), "user_ip_history", ["ip_address"], unique=False)
op.create_index(op.f("ix_user_ip_history_id"), "user_ip_history", ["id"], unique=False)
op.create_table("two_factor_auth",
sa.Column("id", mysql.BIGINT(), autoincrement=True, nullable=False),
sa.Column("user_id", mysql.BIGINT(), autoincrement=False, nullable=False),
sa.Column("verification_code", mysql.VARCHAR(length=8), nullable=False),
sa.Column("expires_at", mysql.DATETIME(), nullable=False),
sa.Column("is_used", mysql.TINYINT(display_width=1), autoincrement=False, nullable=False),
sa.Column("ip_address", mysql.VARCHAR(length=45), nullable=False),
sa.Column("trigger_reason", mysql.VARCHAR(length=50), nullable=False),
sa.Column("verified_at", mysql.DATETIME(), nullable=True),
sa.ForeignKeyConstraint(["user_id"], ["lazer_users.id"], name=op.f("two_factor_auth_ibfk_1")),
sa.PrimaryKeyConstraint("id"),
mysql_collate="utf8mb4_0900_ai_ci",
mysql_default_charset="utf8mb4",
mysql_engine="InnoDB"
)
op.create_index(op.f("ix_two_factor_auth_user_id"), "two_factor_auth", ["user_id"], unique=False)
op.create_index(op.f("ix_two_factor_auth_id"), "two_factor_auth", ["id"], unique=False)
op.drop_index(op.f("ix_password_resets_user_id"), table_name="password_resets")
op.drop_index(op.f("ix_password_resets_email"), table_name="password_resets")
op.drop_table("password_resets")
# ### end Alembic commands ###