mirror of
https://github.com/Lost-MSth/Arcaea-server.git
synced 2025-12-14 08:06:23 +08:00
369 lines
15 KiB
Python
369 lines
15 KiB
Python
import os
|
|
import sys
|
|
from importlib import import_module
|
|
from json import load
|
|
from shutil import copy, copy2
|
|
from time import time
|
|
from traceback import format_exc
|
|
|
|
from core.bundle import BundleParser
|
|
from core.config_manager import Config
|
|
from core.constant import ARCAEA_DATABASE_VERSION, ARCAEA_LOG_DATBASE_VERSION
|
|
from core.course import Course
|
|
from core.download import DownloadList
|
|
from core.purchase import Purchase
|
|
from core.sql import (Connect, DatabaseMigrator, LogDatabaseMigrator,
|
|
MemoryDatabase)
|
|
from core.user import UserRegister
|
|
from core.util import try_rename
|
|
from core.world import MapParser
|
|
|
|
|
|
class DatabaseInit:
|
|
def __init__(self, db_path: str = Config.SQLITE_DATABASE_PATH, init_folder_path: str = Config.DATABASE_INIT_PATH) -> None:
|
|
self.db_path = db_path
|
|
self.init_folder_path = init_folder_path
|
|
self.c = None
|
|
self.init_data = None
|
|
|
|
@property
|
|
def sql_path(self) -> str:
|
|
return os.path.join(self.init_folder_path, 'tables.sql')
|
|
|
|
@property
|
|
def course_path(self) -> str:
|
|
return os.path.join(self.init_folder_path, 'courses.json')
|
|
|
|
@property
|
|
def pack_path(self) -> str:
|
|
return os.path.join(self.init_folder_path, 'packs.json')
|
|
|
|
@property
|
|
def single_path(self) -> str:
|
|
return os.path.join(self.init_folder_path, 'singles.json')
|
|
|
|
def table_init(self) -> None:
|
|
'''初始化数据库结构'''
|
|
with open(self.sql_path, 'r', encoding='utf-8') as f:
|
|
self.c.executescript(f.read())
|
|
self.c.execute('''insert into config values("version", :a);''', {
|
|
'a': ARCAEA_DATABASE_VERSION})
|
|
|
|
def character_init(self) -> None:
|
|
'''初始化搭档信息'''
|
|
uncapped_characters = self.init_data.char_core.keys()
|
|
for i in range(0, len(self.init_data.char)):
|
|
skill_requires_uncap = 1 if i == 2 else 0
|
|
if i in uncapped_characters:
|
|
max_level = 30
|
|
uncapable = 1
|
|
else:
|
|
max_level = 20
|
|
uncapable = 0
|
|
sql = '''insert into character values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'''
|
|
self.c.execute(sql, (i, self.init_data.char[i], max_level, self.init_data.frag1[i], self.init_data.prog1[i], self.init_data.overdrive1[i], self.init_data.frag20[i], self.init_data.prog20[i], self.init_data.overdrive20[i],
|
|
self.init_data.frag30[i], self.init_data.prog30[i], self.init_data.overdrive30[i], self.init_data.skill_id[i], self.init_data.skill_unlock_level[i], skill_requires_uncap, self.init_data.skill_id_uncap[i], self.init_data.char_type[i], uncapable))
|
|
|
|
self.c.execute('''insert into character values(?,?,20,?,?,?,?,?,?,?,?,?,?,?,?,?,?,0)''', (
|
|
99, 'shirahime', 38, 33, 28, 66, 58, 50, 66, 58, 50, 'frags_preferred_song', 0, 0, '', 0))
|
|
|
|
for i in self.init_data.char_core:
|
|
self.c.executemany('''insert into char_item values(?,?,'core',?)''', [
|
|
(i, j['core_id'], j['amount']) for j in self.init_data.char_core[i]])
|
|
|
|
def insert_purchase_item(self, purchases: list):
|
|
'''处理singles和packs'''
|
|
for i in purchases:
|
|
x = Purchase(self.c).from_dict(i)
|
|
x.insert_all()
|
|
|
|
def item_init(self) -> None:
|
|
'''初始化物品信息'''
|
|
self.c.executemany('''insert into item values(?,"core",1)''',
|
|
[(i,) for i in self.init_data.cores])
|
|
self.c.executemany('''insert into item values(?,"world_song",1)''', [
|
|
(i,) for i in self.init_data.world_songs])
|
|
self.c.executemany('''insert into item values(?,"world_unlock",1)''', [
|
|
(i,) for i in self.init_data.world_unlocks])
|
|
self.c.executemany('''insert into item values(?,"course_banner",1)''', [
|
|
(i,) for i in self.init_data.course_banners])
|
|
|
|
self.c.execute('''insert into item values(?,?,?)''',
|
|
('fragment', 'fragment', 1))
|
|
self.c.execute('''insert into item values(?,?,?)''',
|
|
('memory', 'memory', 1))
|
|
self.c.execute('''insert into item values(?,?,?)''',
|
|
('anni5tix', 'anni5tix', 1))
|
|
self.c.execute('''insert into item values(?,?,?)''',
|
|
('pick_ticket', 'pick_ticket', 1))
|
|
|
|
with open(self.pack_path, 'rb') as f:
|
|
self.insert_purchase_item(load(f))
|
|
|
|
with open(self.single_path, 'rb') as f:
|
|
self.insert_purchase_item(load(f))
|
|
|
|
self.c.execute('''insert into item values(?,?,?)''', # 新手任务奖励曲
|
|
('innocence', 'world_song', 1))
|
|
|
|
def course_init(self) -> None:
|
|
'''初始化课题信息'''
|
|
courses = []
|
|
with open(self.course_path, 'rb') as f:
|
|
courses = load(f)
|
|
for i in courses:
|
|
x = Course(self.c).from_dict(i)
|
|
x.insert_all()
|
|
|
|
def role_power_init(self) -> None:
|
|
'''初始化power和role'''
|
|
self.c.executemany('''insert into role values(?,?)''', [(
|
|
self.init_data.role[i], self.init_data.role_caption[i]) for i in range(len(self.init_data.role))])
|
|
|
|
self.c.executemany('''insert into power values(?,?)''', [(
|
|
self.init_data.power[i], self.init_data.power_caption[i]) for i in range(len(self.init_data.power))])
|
|
|
|
for i in self.init_data.role_power:
|
|
self.c.executemany('''insert into role_power values(?,?)''', [
|
|
(i, j) for j in self.init_data.role_power[i]])
|
|
|
|
def admin_init(self) -> None:
|
|
'''初始化测试账号'''
|
|
x = UserRegister(self.c)
|
|
x.user_code = '123456789'
|
|
x.user_id = 2000000
|
|
x.name = 'admin'
|
|
x.email = 'admin@admin.com'
|
|
now = int(time() * 1000)
|
|
|
|
x._insert_user_char()
|
|
|
|
self.c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
|
|
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled, current_map, ticket, prog_boost, email)
|
|
values(:user_id, :name, :password, :join_date, :user_code, 0, 0, 0, 0, 0, 0, -1, 0, '', :memories, 0, :email)
|
|
''', {'user_code': x.user_code, 'user_id': x.user_id, 'join_date': now, 'name': x.name, 'password': '41e5653fc7aeb894026d6bb7b2db7f65902b454945fa8fd65a6327047b5277fb', 'memories': 114514, 'email': x.email})
|
|
|
|
self.c.execute(
|
|
'''insert into user_role values(?, "admin")''', (x.user_id,))
|
|
|
|
def init(self) -> None:
|
|
sys.path.append(os.path.join(sys.path[0], self.init_folder_path))
|
|
self.init_data = import_module('arc_data').InitData
|
|
|
|
with Connect(self.db_path) as c:
|
|
self.c = c
|
|
self.table_init()
|
|
self.character_init()
|
|
self.item_init()
|
|
self.course_init()
|
|
self.role_power_init()
|
|
self.admin_init()
|
|
|
|
|
|
class LogDatabaseInit:
|
|
def __init__(self, db_path: str = Config.SQLITE_LOG_DATABASE_PATH, init_folder_path: str = Config.DATABASE_INIT_PATH) -> None:
|
|
self.db_path = db_path
|
|
self.init_folder_path = init_folder_path
|
|
self.c = None
|
|
|
|
@property
|
|
def sql_path(self) -> str:
|
|
return os.path.join(self.init_folder_path, 'log_tables.sql')
|
|
|
|
def table_init(self) -> None:
|
|
'''初始化数据库结构'''
|
|
with open(self.sql_path, 'r') as f:
|
|
self.c.executescript(f.read())
|
|
self.c.execute(
|
|
'''insert into cache values("version", :a, -1);''', {'a': ARCAEA_LOG_DATBASE_VERSION})
|
|
|
|
def init(self) -> None:
|
|
with Connect(self.db_path) as c:
|
|
self.c = c
|
|
self.table_init()
|
|
|
|
|
|
class DeletedDatabaseInit(DatabaseInit):
|
|
def __init__(self, db_path: str = Config.SQLITE_DATABASE_DELETED_PATH) -> None:
|
|
super().__init__(db_path)
|
|
|
|
def init(self) -> None:
|
|
with Connect(self.db_path) as c:
|
|
self.c = c
|
|
self.table_init()
|
|
|
|
|
|
class FileChecker:
|
|
'''文件检查及初始化类'''
|
|
|
|
def __init__(self, logger=None):
|
|
self.logger = logger
|
|
|
|
def check_file(self, file_path: str) -> bool:
|
|
f = os.path.isfile(file_path)
|
|
if not f:
|
|
self.logger.warning('File `%s` is missing.' % file_path)
|
|
return f
|
|
|
|
def check_folder(self, folder_path: str) -> bool:
|
|
f = os.path.isdir(folder_path)
|
|
if not f:
|
|
self.logger.warning('Folder `%s` is missing.' % folder_path)
|
|
return f
|
|
|
|
def _check_update_database_log(self) -> bool:
|
|
if not self.check_file(Config.SQLITE_LOG_DATABASE_PATH):
|
|
# 新建日志数据库
|
|
try:
|
|
self.logger.info(
|
|
f'Try to new the file {Config.SQLITE_LOG_DATABASE_PATH}')
|
|
LogDatabaseInit().init()
|
|
self.logger.info(
|
|
f'Success to new the file {Config.SQLITE_LOG_DATABASE_PATH}')
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.error(
|
|
f'Failed to new the file {Config.SQLITE_LOG_DATABASE_PATH}')
|
|
return False
|
|
else:
|
|
# 检查更新
|
|
with Connect(Config.SQLITE_LOG_DATABASE_PATH) as c:
|
|
try:
|
|
x = c.execute(
|
|
'''select value from cache where key="version"''').fetchone()
|
|
except:
|
|
x = None
|
|
if not x or x[0] != ARCAEA_LOG_DATBASE_VERSION:
|
|
self.logger.warning(
|
|
f'Maybe the file `{Config.SQLITE_LOG_DATABASE_PATH}` is an old version.')
|
|
try:
|
|
self.logger.info(
|
|
f'Try to update the file `{Config.SQLITE_LOG_DATABASE_PATH}`')
|
|
self.update_log_database()
|
|
self.logger.info(
|
|
f'Success to update the file `{Config.SQLITE_LOG_DATABASE_PATH}`')
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.error(
|
|
f'Failed to update the file `{Config.SQLITE_LOG_DATABASE_PATH}`')
|
|
return False
|
|
|
|
return True
|
|
|
|
def _check_update_database_main(self, db_path=Config.SQLITE_DATABASE_PATH, init_class=DatabaseInit) -> bool:
|
|
if not self.check_file(db_path):
|
|
# 新建数据库
|
|
try:
|
|
self.logger.info(f'Try to new the file `{db_path}`.')
|
|
init_class().init()
|
|
self.logger.info(f'Success to new the file `{db_path}`.')
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.warning(f'Failed to new the file `{db_path}`.')
|
|
return False
|
|
else:
|
|
# 检查更新
|
|
with Connect(db_path) as c:
|
|
try:
|
|
c.execute('''select value from config where id="version"''')
|
|
x = c.fetchone()
|
|
except:
|
|
x = None
|
|
# 数据库自动更新,不强求
|
|
if not x or x[0] != ARCAEA_DATABASE_VERSION:
|
|
self.logger.warning(
|
|
f'Maybe the file `{db_path}` is an old version. Version: {x[0] if x else "None"}')
|
|
try:
|
|
self.logger.info(
|
|
f'Try to update the file `{db_path}` to version {ARCAEA_DATABASE_VERSION}.')
|
|
|
|
if not os.path.isdir(Config.SQLITE_DATABASE_BACKUP_FOLDER_PATH):
|
|
os.makedirs(Config.SQLITE_DATABASE_BACKUP_FOLDER_PATH)
|
|
|
|
backup_path = try_rename(db_path, os.path.join(
|
|
Config.SQLITE_DATABASE_BACKUP_FOLDER_PATH, os.path.split(db_path)[-1] + '.bak'))
|
|
|
|
try:
|
|
copy2(backup_path, db_path)
|
|
except:
|
|
copy(backup_path, db_path)
|
|
|
|
temp_path = os.path.join(
|
|
*os.path.split(db_path)[:-1], 'old_arcaea_database.db')
|
|
if os.path.isfile(temp_path):
|
|
os.remove(temp_path)
|
|
|
|
try_rename(db_path, temp_path)
|
|
|
|
init_class().init()
|
|
self.update_database(temp_path, db_path)
|
|
|
|
self.logger.info(
|
|
f'Success to update the file `{db_path}`.')
|
|
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.warning(
|
|
f'Fail to update the file `{db_path}`.')
|
|
|
|
return True
|
|
|
|
def check_update_database(self) -> bool:
|
|
return self._check_update_database_main() and self._check_update_database_log() and self._check_update_database_main(Config.SQLITE_DATABASE_DELETED_PATH, DeletedDatabaseInit)
|
|
|
|
@staticmethod
|
|
def update_database(old_path: str, new_path: str = Config.SQLITE_DATABASE_PATH) -> None:
|
|
'''更新数据库,并删除旧文件'''
|
|
if os.path.isfile(old_path) and os.path.isfile(new_path):
|
|
DatabaseMigrator(old_path, new_path).update_database()
|
|
os.remove(old_path)
|
|
|
|
@staticmethod
|
|
def update_log_database(old_path: str = Config.SQLITE_LOG_DATABASE_PATH) -> None:
|
|
'''直接更新日志数据库'''
|
|
if os.path.isfile(old_path):
|
|
LogDatabaseMigrator(old_path).update_database()
|
|
|
|
def check_song_file(self) -> bool:
|
|
'''检查song有关文件并初始化缓存'''
|
|
f = self.check_folder(Config.SONG_FILE_FOLDER_PATH)
|
|
self.logger.info("Initialize song data...")
|
|
try:
|
|
DownloadList.initialize_cache()
|
|
if not Config.SONG_FILE_HASH_PRE_CALCULATE:
|
|
self.logger.info('Song file hash pre-calculate is disabled.')
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.warning('Song data initialization error!')
|
|
f = False
|
|
return f
|
|
|
|
def check_content_bundle(self) -> bool:
|
|
'''检查 content bundle 有关文件并初始化缓存'''
|
|
f = self.check_folder(Config.CONTENT_BUNDLE_FOLDER_PATH)
|
|
self.logger.info("Initialize content bundle data...")
|
|
try:
|
|
BundleParser()
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.warning('Content bundle data initialization error!')
|
|
f = False
|
|
return f
|
|
|
|
def check_world_map(self) -> bool:
|
|
'''检查 world map 有关文件并初始化缓存'''
|
|
f = self.check_folder(Config.WORLD_MAP_FOLDER_PATH)
|
|
self.logger.info("Initialize world map data...")
|
|
try:
|
|
MapParser()
|
|
except Exception as e:
|
|
self.logger.error(format_exc())
|
|
self.logger.warning('World map data initialization error!')
|
|
f = False
|
|
return f
|
|
|
|
def check_before_run(self) -> bool:
|
|
'''运行前检查,返回布尔值'''
|
|
MemoryDatabase() # 初始化内存数据库
|
|
return self.check_song_file() and self.check_content_bundle() and self.check_update_database() and self.check_world_map()
|