Files
g0v0-server/sync_data.py
2025-07-28 13:09:55 +00:00

237 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Lazer API 数据同步脚本
用于将现有的 bancho.py 数据同步到新的 lazer 专用表中
"""
from __future__ import annotations
import logging
import os
import sys
import pymysql
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("data_sync.log"), logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
class DatabaseSyncer:
def __init__(self, host: str, port: int, user: str, password: str, database: str):
"""初始化数据库连接配置"""
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self):
"""连接到数据库"""
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset="utf8mb4",
autocommit=False,
)
logger.info(f"成功连接到数据库 {self.database}")
except Exception as e:
logger.error(f"连接数据库失败: {e}")
raise
def disconnect(self):
"""断开数据库连接"""
if self.connection:
self.connection.close()
logger.info("数据库连接已关闭")
def execute_sql_file(self, file_path: str):
"""执行 SQL 文件"""
if not os.path.exists(file_path):
logger.error(f"SQL 文件不存在: {file_path}")
return False
try:
with open(file_path, encoding="utf-8") as f:
sql_content = f.read()
# 分割SQL语句简单实现按分号分割
statements = [
stmt.strip() for stmt in sql_content.split(";") if stmt.strip()
]
cursor = self.connection.cursor()
for i, statement in enumerate(statements):
# 跳过注释和空语句
if statement.startswith("--") or not statement:
continue
try:
logger.info(f"执行第 {i + 1}/{len(statements)} 条SQL语句...")
cursor.execute(statement)
# 如果是SELECT语句显示结果
if statement.strip().upper().startswith("SELECT"):
results = cursor.fetchall()
if results:
logger.info(f"查询结果: {results}")
except Exception as e:
logger.error(f"执行SQL语句失败: {statement[:100]}...")
logger.error(f"错误信息: {e}")
# 继续执行其他语句
continue
self.connection.commit()
cursor.close()
logger.info(f"成功执行SQL文件: {file_path}")
return True
except Exception as e:
logger.error(f"执行SQL文件失败: {e}")
if self.connection:
self.connection.rollback()
return False
def check_tables_exist(self, tables: list) -> dict:
"""检查表是否存在"""
results = {}
cursor = self.connection.cursor()
for table in tables:
try:
cursor.execute(f"SHOW TABLES LIKE '{table}'")
exists = cursor.fetchone() is not None
results[table] = exists
logger.info(f"'{table}' {'存在' if exists else '不存在'}")
except Exception as e:
logger.error(f"检查表 '{table}' 时出错: {e}")
results[table] = False
cursor.close()
return results
def get_table_count(self, table: str) -> int:
"""获取表的记录数"""
try:
cursor = self.connection.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table}")
result = cursor.fetchone()
count = result[0] if result else 0
cursor.close()
return count
except Exception as e:
logger.error(f"获取表 '{table}' 记录数失败: {e}")
return -1
def main():
"""主函数"""
print("Lazer API 数据同步工具")
print("=" * 50)
# 数据库配置
db_config = {
"host": input("数据库主机 [localhost]: ").strip() or "localhost",
"port": int(input("数据库端口 [3306]: ").strip() or "3306"),
"user": input("数据库用户名: ").strip(),
"password": input("数据库密码: ").strip(),
"database": input("数据库名称: ").strip(),
}
syncer = DatabaseSyncer(**db_config)
try:
# 连接数据库
syncer.connect()
# 检查必要的原始表是否存在
required_tables = ["users", "stats"]
table_status = syncer.check_tables_exist(required_tables)
missing_tables = [table for table, exists in table_status.items() if not exists]
if missing_tables:
logger.error(f"缺少必要的原始表: {missing_tables}")
return
# 显示原始表的记录数
for table in required_tables:
count = syncer.get_table_count(table)
logger.info(f"'{table}' 当前有 {count} 条记录")
# 确认是否执行同步
print("\n准备执行数据同步...")
print("这将会:")
print("1. 创建 lazer 专用表结构 (如果不存在)")
print("2. 从现有表同步数据到新表")
print("3. 不会修改或删除现有数据")
confirm = input("\n是否继续? (y/N): ").strip().lower()
if confirm != "y":
print("操作已取消")
return
# 执行表结构创建
migrations_dir = os.path.join(os.path.dirname(__file__), "migrations_old")
print("\n步骤 1: 创建表结构...")
add_fields_sql = os.path.join(migrations_dir, "add_missing_fields.sql")
if os.path.exists(add_fields_sql):
success = syncer.execute_sql_file(add_fields_sql)
if not success:
logger.error("创建表结构失败")
return
else:
logger.warning(f"表结构文件不存在: {add_fields_sql}")
# 执行数据同步
print("\n步骤 2: 同步数据...")
sync_sql = os.path.join(migrations_dir, "sync_legacy_data.sql")
if os.path.exists(sync_sql):
success = syncer.execute_sql_file(sync_sql)
if not success:
logger.error("数据同步失败")
return
else:
logger.error(f"同步脚本不存在: {sync_sql}")
return
# 显示同步后的统计信息
print("\n步骤 3: 同步完成统计...")
lazer_tables = [
"lazer_user_profiles",
"lazer_user_countries",
"lazer_user_statistics",
"lazer_user_kudosu",
"lazer_user_counts",
]
for table in lazer_tables:
count = syncer.get_table_count(table)
if count >= 0:
logger.info(f"'{table}' 现在有 {count} 条记录")
print("\n数据同步完成!")
except KeyboardInterrupt:
print("\n\n操作被用户中断")
except Exception as e:
logger.error(f"同步过程中发生错误: {e}")
finally:
syncer.disconnect()
if __name__ == "__main__":
main()