[Refactor] Init files Encoding & Refresh rating

- Now initial files can be other encoding types which are supported by JSON module
- Code refactor for refreshing all scores' rating
This commit is contained in:
Lost-MSth
2022-11-24 21:40:44 +08:00
parent e3d5c19569
commit 84b0e869a5
9 changed files with 81 additions and 164 deletions

View File

@@ -51,7 +51,7 @@ class SonglistParser:
self.parse()
@staticmethod
def is_available_file(song_id: str, file_name: str) -> list:
def is_available_file(song_id: str, file_name: str) -> bool:
'''判断文件是否允许被下载'''
if song_id not in SonglistParser.songs:
# songlist没有则只限制文件名

View File

@@ -90,10 +90,10 @@ class DatabaseInit:
self.c.execute('''insert into item values(?,?,?)''',
('anni5tix', 'anni5tix', 1))
with open(self.pack_path, 'r') as f:
with open(self.pack_path, 'rb') as f:
self.insert_purchase_item(load(f))
with open(self.single_path, 'r') as f:
with open(self.single_path, 'rb') as f:
self.insert_purchase_item(load(f))
self.c.execute(
@@ -105,7 +105,7 @@ class DatabaseInit:
def course_init(self) -> None:
'''初始化课题信息'''
courses = []
with open(self.course_path, 'r', encoding='utf-8') as f:
with open(self.course_path, 'rb') as f:
courses = load(f)
for i in courses:
x = Course(self.c).from_dict(i)

View File

@@ -0,0 +1,54 @@
from .sql import Connect, Sql
from .score import Score
class BaseOperation:
name: str = None
def __init__(self):
pass
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def run(self, *args, **kwargs):
raise NotImplementedError
class RefreshAllScoreRating(BaseOperation):
'''
刷新所有成绩的评分
'''
name = 'refresh_all_score_rating'
def run(self):
# 追求效率不用Song类尽量不用对象
with Connect() as c:
c.execute(
'''select song_id, rating_pst, rating_prs, rating_ftr, rating_byn from chart''')
x = c.fetchall()
songs = [i[0] for i in x]
c.execute(
f'''update best_score set rating=0 where song_id not in ({','.join(['?']*len(songs))})''', songs)
for i in x:
for j in range(0, 4):
defnum = -10 # 没在库里的全部当做定数-10
if i[j+1] is not None and i[j+1] > 0:
defnum = float(i[j+1]) / 10
c.execute('''select user_id, score from best_score where song_id=:a and difficulty=:b''', {
'a': i[0], 'b': j})
y = c.fetchall()
values = []
where_values = []
for k in y:
ptt = Score.calculate_rating(defnum, k[1])
if ptt < 0:
ptt = 0
values.append((ptt,))
where_values.append((k[0], i[0], j))
if values:
Sql(c).update_many('best_score', ['rating'], values, [
'user_id', 'song_id', 'difficulty'], where_values)

View File

@@ -103,7 +103,7 @@ class Score:
@staticmethod
def calculate_rating(defnum: int, score: int) -> float:
'''计算rating谱面定数小于等于0视为Unrank这里的defnum = Chart const'''
'''计算rating谱面定数小于等于0视为Unrank返回值会为-1这里的defnum = Chart const'''
if not defnum or defnum <= 0:
# 谱面没定数或者定数小于等于0被视作Unrank
return -1

View File

@@ -49,9 +49,9 @@ class Connect:
class Query:
'''查询参数类'''
def __init__(self, query_able: list = None, quzzy_query_able: list = None, sort_able: list = None) -> None:
def __init__(self, query_able: list = None, fuzzy_query_able: list = None, sort_able: list = None) -> None:
self.query_able: list = query_able # None表示不限制
self.quzzy_query_able: list = quzzy_query_able # None表示不限制
self.fuzzy_query_able: list = fuzzy_query_able # None表示不限制
self.sort_able: list = sort_able
self.__limit: int = -1
@@ -115,7 +115,7 @@ class Query:
def fuzzy_query_append(self, fuzzy_query: dict) -> None:
if not isinstance(fuzzy_query, dict):
raise InputError(api_error_code=-101)
if self.quzzy_query_able is not None and fuzzy_query and not set(fuzzy_query).issubset(set(self.quzzy_query_able)):
if self.fuzzy_query_able is not None and fuzzy_query and not set(fuzzy_query).issubset(set(self.fuzzy_query_able)):
raise InputError(api_error_code=-102)
if not self.__fuzzy_query:
self.__fuzzy_query = fuzzy_query
@@ -216,7 +216,7 @@ class Sql:
return ('insert into ' if insert_type is None else 'insert or ' + insert_type + ' into ') + table_name + ('(' + ','.join(key) + ')' if key else '') + ' values(' + ','.join(['?'] * (len(key) if value_len is None else value_len)) + ')'
@staticmethod
def get_update_sql(table_name: str, d: dict = {}, query: 'Query' = None) -> str:
def get_update_sql(table_name: str, d: dict = None, query: 'Query' = None):
if not d:
return None
sql_list = []
@@ -245,6 +245,13 @@ class Sql:
return sql, sql_list
@staticmethod
def get_update_many_sql(table_name: str, key: list = None, where_key: list = None) -> str:
'''拼接update语句这里不用Query类也不用字典请注意只返回sql语句'''
if not key or not where_key:
return None
return f"update {table_name} set {','.join([f'{k}=?' for k in key])} where {' and '.join([f'{k}=?' for k in where_key])}"
@staticmethod
def get_delete_sql(table_name: str, query: 'Query' = None):
'''拼接删除语句query中只有query和fuzzy_query会被处理'''
@@ -304,6 +311,13 @@ class Sql:
sql, sql_list = self.get_update_sql(table_name, d, query)
self.c.execute(sql, sql_list)
def update_many(self, table_name: str, key: list, value_list: list, where_key: list, where_value_list: list) -> None:
'''单表内行update多句sql语句这里不用Query类也不用字典要求值list长度一致有点像insert_many'''
if not key or not value_list or not where_key or not where_value_list or not len(key) == len(value_list[0]) or not len(where_key) == len(where_value_list[0]) or not len(value_list) == len(where_value_list):
raise ValueError
self.c.executemany(self.get_update_many_sql(
table_name, key, where_key), [x + y for x, y in zip(value_list, where_value_list)])
def delete(self, table_name: str, query: 'Query' = None) -> None:
'''删除query中只有query和fuzzy_query会被处理'''
sql, sql_list = self.get_delete_sql(table_name, query)

View File

@@ -1,56 +0,0 @@
from core.sql import Connect
def calculate_rating(defnum, score):
# 计算rating
if score >= 10000000:
ptt = defnum + 2
elif score < 9800000:
ptt = defnum + (score-9500000) / 300000
if ptt < 0 and defnum != -10:
ptt = 0
else:
ptt = defnum + 1 + (score-9800000) / 200000
return ptt
def refresh_all_score_rating():
# 刷新所有best成绩的rating
error = 'Unknown error.'
with Connect() as c:
c.execute(
'''select song_id, rating_pst, rating_prs, rating_ftr, rating_byn from chart''')
x = c.fetchall()
if x:
song_list = [i[0] for i in x]
with Connect() as c:
c.execute('''update best_score set rating=0 where song_id not in ({0})'''.format(
','.join(['?']*len(song_list))), tuple(song_list))
for i in x:
for j in range(0, 4):
defnum = -10 # 没在库里的全部当做定数-10
if i is not None:
defnum = float(i[j+1]) / 10
if defnum <= 0:
defnum = -10 # 缺少难度的当做定数-10
c.execute('''select user_id, score from best_score where song_id=:a and difficulty=:b''', {
'a': i[0], 'b': j})
y = c.fetchall()
if y:
for k in y:
ptt = calculate_rating(defnum, k[1])
if ptt < 0:
ptt = 0
c.execute('''update best_score set rating=:a where user_id=:b and song_id=:c and difficulty=:d''', {
'a': ptt, 'b': k[0], 'c': i[0], 'd': j})
error = None
else:
error = 'No song data.'
return error

View File

@@ -1,91 +0,0 @@
import os
from shutil import copy, copy2
from core.sql import Connect
from database.database_initialize import ARCAEA_SERVER_VERSION, main
from web.system import update_database
def try_rename(path, new_path):
# 尝试重命名文件,并尝试避免命名冲突,返回最终路径
final_path = new_path
if os.path.exists(new_path):
i = 1
while os.path.exists(new_path + str(i)):
i += 1
os.rename(path, new_path + str(i))
final_path = new_path + str(i)
else:
os.rename(path, new_path)
return final_path
def check_before_run(app):
# 运行前检查关键文件,返回布尔值,其实是因为有人经常忘了
f = True
if not os.path.exists('database'):
app.logger.warning('Folder `database` is missing.')
f = False
if not os.path.exists('database/songs'):
app.logger.warning('Folder `database/songs` is missing.')
f = False
if not os.path.exists('database/arcaea_database.db'):
app.logger.warning('File `database/arcaea_database.db` is missing.')
f = False
try:
app.logger.info(
'Try to new the file `database/arcaea_database.db`.')
main('./database/')
app.logger.info(
'Success to new the file `database/arcaea_database.db`.')
f = True
except:
app.logger.warning(
'Fail to new the file `database/arcaea_database.db`.')
else:
with Connect() as c:
try:
c.execute('''select value from config where id="version"''')
x = c.fetchone()
except:
x = None
# 数据库自动更新,不强求
if not x or x[0] != ARCAEA_SERVER_VERSION:
app.logger.warning(
'Maybe the file `database/arcaea_database.db` is an old version.')
try:
app.logger.info(
'Try to update the file `database/arcaea_database.db`.')
path = try_rename('database/arcaea_database.db',
'database/arcaea_database.db.bak')
try:
copy2(path, 'database/arcaea_database.db')
except:
copy(path, 'database/arcaea_database.db')
if os.path.isfile("database/old_arcaea_database.db"):
os.remove('database/old_arcaea_database.db')
try_rename('database/arcaea_database.db',
'database/old_arcaea_database.db')
main('./database/')
update_database()
app.logger.info(
'Success to update the file `database/arcaea_database.db`.')
except:
app.logger.warning(
'Fail to update the file `database/arcaea_database.db`.')
return f

View File

@@ -1,11 +1,10 @@
from core.error import ArcError
from core.present import UserPresent, UserPresentList
from core.sql import Connect
from core.user import UserOnline
from flask import Blueprint, request
from .auth import auth_required
from .func import arc_try, error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('present', __name__, url_prefix='/present')

View File

@@ -1,9 +1,9 @@
import os
import time
import server.arcscore
from core.download import DownloadList, initialize_songfile
from core.init import FileChecker
from core.operation import RefreshAllScoreRating
from core.rank import RankList
from core.sql import Connect
from flask import Blueprint, flash, redirect, render_template, request, url_for
@@ -302,11 +302,8 @@ def update_song_hash():
@login_required
def update_song_rating():
# 更新所有分数的rating
error = server.arcscore.refresh_all_score_rating()
if error:
flash(error)
else:
flash('数据刷新成功 Success refresh data.')
RefreshAllScoreRating().run()
flash('数据刷新成功 Success refresh data.')
return render_template('web/updatedatabase.html')