use uv & make lint happy

This commit is contained in:
MingxuanGame
2025-07-23 18:03:30 +08:00
parent c0246440f3
commit 7b5a50493a
17 changed files with 2117 additions and 692 deletions

View File

@@ -4,25 +4,24 @@ Lazer API 数据同步脚本
用于将现有的 bancho.py 数据同步到新的 lazer 专用表中
"""
from __future__ import annotations
import logging
import os
import sys
import pymysql
from typing import Optional
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_sync.log'),
logging.StreamHandler(sys.stdout)
]
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("data_sync.log"), logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
class DatabaseSyncer:
def __init__(self, host: str, port: int, user: str, password: str, database: str):
"""初始化数据库连接配置"""
@@ -32,7 +31,7 @@ class DatabaseSyncer:
self.password = password
self.database = database
self.connection = None
def connect(self):
"""连接到数据库"""
try:
@@ -42,72 +41,74 @@ class DatabaseSyncer:
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=False
charset="utf8mb4",
autocommit=False,
)
logger.info(f"成功连接到数据库 {self.database}")
except Exception as e:
logger.error(f"连接数据库失败: {e}")
raise
def disconnect(self):
"""断开数据库连接"""
if self.connection:
self.connection.close()
logger.info("数据库连接已关闭")
def execute_sql_file(self, file_path: str):
"""执行 SQL 文件"""
if not os.path.exists(file_path):
logger.error(f"SQL 文件不存在: {file_path}")
return False
try:
with open(file_path, 'r', encoding='utf-8') as f:
with open(file_path, encoding="utf-8") as f:
sql_content = f.read()
# 分割SQL语句简单实现按分号分割
statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()]
statements = [
stmt.strip() for stmt in sql_content.split(";") if stmt.strip()
]
cursor = self.connection.cursor()
for i, statement in enumerate(statements):
# 跳过注释和空语句
if statement.startswith('--') or not statement:
if statement.startswith("--") or not statement:
continue
try:
logger.info(f"执行第 {i+1}/{len(statements)} 条SQL语句...")
logger.info(f"执行第 {i + 1}/{len(statements)} 条SQL语句...")
cursor.execute(statement)
# 如果是SELECT语句显示结果
if statement.strip().upper().startswith('SELECT'):
if statement.strip().upper().startswith("SELECT"):
results = cursor.fetchall()
if results:
logger.info(f"查询结果: {results}")
except Exception as e:
logger.error(f"执行SQL语句失败: {statement[:100]}...")
logger.error(f"错误信息: {e}")
# 继续执行其他语句
continue
self.connection.commit()
cursor.close()
logger.info(f"成功执行SQL文件: {file_path}")
return True
except Exception as e:
logger.error(f"执行SQL文件失败: {e}")
if self.connection:
self.connection.rollback()
return False
def check_tables_exist(self, tables: list) -> dict:
"""检查表是否存在"""
results = {}
cursor = self.connection.cursor()
for table in tables:
try:
cursor.execute(f"SHOW TABLES LIKE '{table}'")
@@ -117,10 +118,10 @@ class DatabaseSyncer:
except Exception as e:
logger.error(f"检查表 '{table}' 时出错: {e}")
results[table] = False
cursor.close()
return results
def get_table_count(self, table: str) -> int:
"""获取表的记录数"""
try:
@@ -134,57 +135,58 @@ class DatabaseSyncer:
logger.error(f"获取表 '{table}' 记录数失败: {e}")
return -1
def main():
"""主函数"""
print("Lazer API 数据同步工具")
print("=" * 50)
# 数据库配置
db_config = {
'host': input("数据库主机 [localhost]: ").strip() or 'localhost',
'port': int(input("数据库端口 [3306]: ").strip() or '3306'),
'user': input("数据库用户名: ").strip(),
'password': input("数据库密码: ").strip(),
'database': input("数据库名称: ").strip()
"host": input("数据库主机 [localhost]: ").strip() or "localhost",
"port": int(input("数据库端口 [3306]: ").strip() or "3306"),
"user": input("数据库用户名: ").strip(),
"password": input("数据库密码: ").strip(),
"database": input("数据库名称: ").strip(),
}
syncer = DatabaseSyncer(**db_config)
try:
# 连接数据库
syncer.connect()
# 检查必要的原始表是否存在
required_tables = ['users', 'stats']
required_tables = ["users", "stats"]
table_status = syncer.check_tables_exist(required_tables)
missing_tables = [table for table, exists in table_status.items() if not exists]
if missing_tables:
logger.error(f"缺少必要的原始表: {missing_tables}")
return
# 显示原始表的记录数
for table in required_tables:
count = syncer.get_table_count(table)
logger.info(f"'{table}' 当前有 {count} 条记录")
# 确认是否执行同步
print("\n准备执行数据同步...")
print("这将会:")
print("1. 创建 lazer 专用表结构 (如果不存在)")
print("2. 从现有表同步数据到新表")
print("3. 不会修改或删除现有数据")
confirm = input("\n是否继续? (y/N): ").strip().lower()
if confirm != 'y':
if confirm != "y":
print("操作已取消")
return
# 执行表结构创建
migrations_dir = os.path.join(os.path.dirname(__file__), 'migrations')
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations")
print("\n步骤 1: 创建表结构...")
add_fields_sql = os.path.join(migrations_dir, 'add_missing_fields.sql')
add_fields_sql = os.path.join(migrations_dir, "add_missing_fields.sql")
if os.path.exists(add_fields_sql):
success = syncer.execute_sql_file(add_fields_sql)
if not success:
@@ -192,10 +194,10 @@ def main():
return
else:
logger.warning(f"表结构文件不存在: {add_fields_sql}")
# 执行数据同步
print("\n步骤 2: 同步数据...")
sync_sql = os.path.join(migrations_dir, 'sync_legacy_data.sql')
sync_sql = os.path.join(migrations_dir, "sync_legacy_data.sql")
if os.path.exists(sync_sql):
success = syncer.execute_sql_file(sync_sql)
if not success:
@@ -204,24 +206,24 @@ def main():
else:
logger.error(f"同步脚本不存在: {sync_sql}")
return
# 显示同步后的统计信息
print("\n步骤 3: 同步完成统计...")
lazer_tables = [
'lazer_user_profiles',
'lazer_user_countries',
'lazer_user_statistics',
'lazer_user_kudosu',
'lazer_user_counts'
"lazer_user_profiles",
"lazer_user_countries",
"lazer_user_statistics",
"lazer_user_kudosu",
"lazer_user_counts",
]
for table in lazer_tables:
count = syncer.get_table_count(table)
if count >= 0:
logger.info(f"'{table}' 现在有 {count} 条记录")
print("\n数据同步完成!")
except KeyboardInterrupt:
print("\n\n操作被用户中断")
except Exception as e:
@@ -229,5 +231,6 @@ def main():
finally:
syncer.disconnect()
if __name__ == "__main__":
main()