Code refactoring

- Code refactoring mainly for API
- Delete a useless option in `setting.py`
- Change some constants in Link Play mode
This commit is contained in:
Lost-MSth
2022-07-06 22:07:00 +08:00
parent 9d746eab2d
commit af3e91b3e8
43 changed files with 780 additions and 1867 deletions

View File

@@ -0,0 +1,134 @@
from hashlib import sha256
from os import urandom
from time import time
from .error import NoAccess, NoData, UserBan
from .user import UserOnline
class Power:
def __init__(self, c=None):
self.c = c
self.power_id: int = None
self.power_name: str = None
self.caption: str = None
@classmethod
def from_dict(cls, d: dict, c=None) -> 'Power':
p = cls(c)
p.power_id = d['power_id']
p.power_name = d['power_name']
p.caption = d['caption']
return p
def select_from_name(self, power_name: str) -> 'Power':
pass
class Role:
def __init__(self, c=None):
self.c = c
self.role_id: int = None
self.role_name: str = None
self.caption: str = None
self.powers: list = None
def has_power(self, power_name: str) -> bool:
'''判断role是否有power'''
for i in self.powers:
if i.power_name == power_name:
return True
return False
def select_from_id(self, role_id: int = None) -> 'Role':
'''用role_id查询role'''
if role_id is not None:
self.role_id = role_id
self.c.execute('''select role_name, caption from role where role_id = :a''',
{'a': self.role_id})
x = self.c.fetchone()
if x is None:
raise NoData('The role `%s` does not exist.' %
self.role_id, api_error_code=-200)
self.role_name = x[0]
self.caption = x[1]
return self
def select_powers(self) -> None:
'''查询role的全部powers'''
self.powers = []
self.c.execute('''select * from power where power_id in (select power_id from role_power where role_id=:a)''', {
'a': self.role_id})
x = self.c.fetchall()
for i in x:
self.powers.append(Power.from_dict(
{'power_id': i[0], 'power_name': i[1], 'caption': i[2]}, self.c))
class APIUser(UserOnline):
def __init__(self, c=None, user_id=None) -> None:
super().__init__(c, user_id)
self.api_token: str = None
self.role: 'Role' = None
self.ip: str = None
def select_role(self) -> None:
'''查询user的role'''
self.c.execute('''select role_id from user_role where user_id = :a''',
{'a': self.user_id})
x = self.c.fetchone()
self.role = Role(self.c)
if x is None:
# 默认role为user
self.role.role_id = 1
else:
self.role.role_id = int(x[0])
self.role.select_from_id()
def select_role_and_powers(self) -> None:
'''查询user的role以及role的powers'''
self.select_role()
self.role.select_powers()
def select_user_id_from_api_token(self, api_token: str = None) -> None:
if api_token is not None:
self.api_token = api_token
self.c.execute('''select user_id from api_login where token = :token''', {
'token': self.api_token})
x = self.c.fetchone()
if x is None:
raise NoAccess('No token', api_error_code=-1)
self.user_id = x[0]
def logout(self) -> None:
self.c.execute(
'''delete from api_login where user_id=?''', (self.user_id,))
def login(self, name: str = None, password: str = None, ip: str = None) -> None:
if name is not None:
self.name = name
if password is not None:
self.password = password
if ip is not None:
self.ip = ip
self.c.execute('''select user_id, password from user where name = :a''', {
'a': self.name})
x = self.c.fetchone()
if x is None:
raise NoData('The user `%s` does not exist.' %
self.name, api_error_code=-201)
if x[1] == '':
raise UserBan('The user `%s` is banned.' % self.name)
if self.hash_pwd != x[1]:
raise NoAccess('The password is incorrect.', api_error_code=-201)
self.user_id = x[0]
now = int(time() * 1000)
self.token = sha256(
(str(self.user_id) + str(now)).encode("utf8") + urandom(8)).hexdigest()
self.logout()
self.c.execute('''insert into api_login values(?,?,?,?)''',
(self.user_id, self.token, now, self.ip))

View File

@@ -120,7 +120,6 @@ class Character:
def skill_id_displayed(self) -> str:
return None
@property
def uncap_cores_to_dict(self):
return [x.to_dict() for x in self.uncap_cores]
@@ -210,13 +209,12 @@ class UserCharacter(Character):
self.select_character_core()
@property
def to_dict(self):
def to_dict(self) -> dict:
if self.char_type is None:
self.select_character_info(self.user)
r = {"is_uncapped_override": self.is_uncapped_override,
"is_uncapped": self.is_uncapped,
"uncap_cores": self.uncap_cores_to_dict,
"uncap_cores": self.uncap_cores_to_dict(),
"char_type": self.char_type,
"skill_id_uncap": self.skill.skill_id_uncap,
"skill_requires_uncap": self.skill.skill_requires_uncap,

View File

@@ -25,6 +25,7 @@ class Constant:
WORLD_MAP_FOLDER_PATH = './database/map/'
SONG_FILE_FOLDER_PATH = './database/songs/'
SQLITE_DATABASE_PATH = './database/arcaea_database.db'
DOWNLOAD_TIMES_LIMIT = Config.DOWNLOAD_TIMES_LIMIT
DOWNLOAD_TIME_GAP_LIMIT = Config.DOWNLOAD_TIME_GAP_LIMIT

View File

@@ -34,7 +34,7 @@ class PostError(ArcError):
class UserBan(ArcError):
# 用户封禁
def __init__(self, message=None, error_code=121, api_error_code=-999, extra_data=None) -> None:
def __init__(self, message=None, error_code=121, api_error_code=-202, extra_data=None) -> None:
super().__init__(message, error_code, api_error_code, extra_data)

View File

@@ -9,8 +9,8 @@ from .user import UserInfo
def get_song_unlock(client_song_map: dict) -> bytes:
'''处理可用歌曲bit返回bytes'''
user_song_unlock = [0] * 512
for i in range(0, 1024, 2):
user_song_unlock = [0] * Constant.LINK_PLAY_UNLOCK_LENGTH
for i in range(0, Constant.LINK_PLAY_UNLOCK_LENGTH*2, 2):
x = 0
y = 0
if str(i) in client_song_map:

View File

@@ -113,7 +113,7 @@ class UserPresentList:
self.presents: list = None
def to_dict(self) -> list:
def to_dict_list(self) -> list:
return [x.to_dict() for x in self.presents]
def select_user_presents(self) -> None:

View File

@@ -38,7 +38,6 @@ class Purchase:
return self.price
return self.orig_price
@property
def to_dict(self) -> dict:
price = self.price_displayed
r = {
@@ -143,9 +142,8 @@ class PurchaseList:
self.user = user
self.purchases: list = []
@property
def to_dict(self) -> list:
return [x.to_dict for x in self.purchases]
def to_dict_list(self) -> list:
return [x.to_dict() for x in self.purchases]
def select_from_type(self, item_type: str) -> 'PurchaseList':
self.c.execute('''select purchase_name from purchase_item where type = :a''', {

View File

@@ -1,7 +1,7 @@
from .user import UserInfo
from .song import Chart
from .score import UserScore
from .constant import Constant
from .score import UserScore
from .song import Chart
from .user import UserInfo
class RankList:
@@ -18,9 +18,8 @@ class RankList:
self.limit: int = 20
self.user = None
@property
def to_dict_list(self) -> list:
return [x.to_dict for x in self.list]
return [x.to_dict() for x in self.list]
def select_top(self) -> None:
'''

View File

@@ -18,7 +18,6 @@ class SaveData:
self.story_data = []
self.createdAt = 0
@property
def to_dict(self):
return {
"user_id": self.user.user_id,

View File

@@ -3,6 +3,7 @@ from time import time
from .constant import Constant
from .error import NoData, StaminaNotEnough
from .song import Chart
from .sql import Query, Sql
from .util import md5
from .world import WorldPlay
@@ -122,7 +123,6 @@ class Score:
self.rating = self.calculate_rating(self.song.chart_const, self.score)
return self.rating
@property
def to_dict(self) -> dict:
return {
"rating": self.rating,
@@ -158,20 +158,29 @@ class UserScore(Score):
raise NoData('No score data.')
self.user.select_user_about_character()
self.from_list(x)
def from_list(self, x: list) -> 'UserScore':
if self.song.song_id is None:
self.song.song_id = x[1]
if self.song.difficulty is None:
self.song.difficulty = x[2]
self.set_score(x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[12])
self.best_clear_type = int(x[11])
self.rating = float(x[13])
@property
def to_dict(self) -> dict:
r = super().to_dict
r['user_id'] = self.user.user_id
r['name'] = self.user.name
return self
def to_dict(self, has_user_info: bool = True) -> dict:
r = super().to_dict()
r['best_clear_type'] = self.best_clear_type
r['is_skill_sealed'] = self.user.is_skill_sealed
character = self.user.character_displayed
r['is_char_uncapped'] = character.is_uncapped_displayed
r['character'] = character.character_id
if has_user_info:
r['user_id'] = self.user.user_id
r['name'] = self.user.name
r['is_skill_sealed'] = self.user.is_skill_sealed
character = self.user.character_displayed
r['is_char_uncapped'] = character.is_uncapped_displayed
r['character'] = character.character_id
if self.rank:
r['rank'] = self.rank
return r
@@ -196,14 +205,13 @@ class UserPlay(UserScore):
self.ptt: Potential = None # 临时用来计算用户ptt的
self.world_play: 'WorldPlay' = None
@property
def to_dict(self) -> dict:
if self.is_world_mode is None:
return {}
elif not self.is_world_mode:
return {'global_rank': self.user.global_rank, 'user_rating': self.user.rating_ptt}
else:
r = self.world_play.to_dict
r = self.world_play.to_dict()
r['user_rating'] = self.user.rating_ptt
r['global_rank'] = self.user.global_rank
return r
@@ -384,6 +392,8 @@ class Potential:
self.s30: list = None
self.songs_selected: list = None
self.b30: list = None
@property
def value(self) -> float:
'''计算用户潜力值'''
@@ -432,6 +442,15 @@ class Potential:
i += 1
return rating_sum
def recent_30_to_dict_list(self) -> list:
if self.r30 is None:
self.select_recent_30()
return [{
'song_id': self.s30[i][:-1],
'difficulty': int(self.s30[i][-1]),
'rating': self.r30[i]
} for i in range(len(self.r30))]
def recent_30_update(self, pop_index: int, rating: float, song_id_difficulty: str) -> None:
self.r30.pop(pop_index)
self.s30.pop(pop_index)
@@ -449,3 +468,32 @@ class Potential:
sql_list.append(self.user.user_id)
self.c.execute(sql, sql_list)
class UserScoreList:
'''
用户分数查询类\
properties: `user` - `User`类或子类的实例
'''
def __init__(self, c=None, user=None):
self.c = c
self.user = user
self.scores: list = None
self.query: 'Query' = Query(['user_id', 'song_id', 'difficulty'], ['song_id'], [
'rating', 'difficulty', 'song_id', 'score', 'time_played'])
def to_dict_list(self) -> list:
return [x.to_dict(has_user_info=False) for x in self.scores]
def select_from_user(self, user=None) -> None:
'''获取用户的best_score数据'''
if user is not None:
self.user = user
self.query.query_append({'user_id': self.user.user_id})
self.query.sort += [{'column': 'rating', 'order': 'DESC'}]
print(self.query.sort)
x = Sql(self.c).select('best_score', query=self.query)
self.scores = [UserScore(self.c, self.user).from_list(i) for i in x]

View File

@@ -9,6 +9,12 @@ class Chart:
self.set_chart(song_id, difficulty)
self.defnum: int = None
def to_dict(self) -> dict:
return {
'difficulty': self.difficulty,
'chart_const': self.chart_const
}
@property
def chart_const(self) -> float:
return self.defnum / 10 if self.defnum else -1
@@ -37,6 +43,37 @@ class Song:
self.name: str = None
self.charts: dict = None
def to_dict(self) -> dict:
return {
'song_id': self.song_id,
'name': self.name,
'charts': [chart.to_dict() for chart in self.charts]
}
def from_list(self, x: list) -> 'Song':
if self.song_id is None:
self.song_id = x[0]
self.name = x[1]
self.charts = [Chart(self.c, self.song_id, 0), Chart(self.c, self.song_id, 1), Chart(
self.c, self.song_id, 2), Chart(self.c, self.song_id, 3)]
self.charts[0].defnum = x[2]
self.charts[1].defnum = x[3]
self.charts[2].defnum = x[4]
self.charts[3].defnum = x[5]
return self
def insert(self) -> None:
self.c.execute(
'''insert into chart values (?,?,?,?,?,?)''', (self.song_id, self.name, self.charts[0].defnum, self.charts[1].defnum, self.charts[2].defnum, self.charts[3].defnum))
def select(self, song_id: str = None) -> 'Song':
if song_id is not None:
self.song_id = song_id
self.c.execute('''select * from chart where song_id=:a''', {
'a': self.song_id})
x = self.c.fetchone()
if x is None:
raise NoData('The song `%s` does not exist.' % self.song_id)
return self.from_list(x)

View File

@@ -1,12 +1,17 @@
import sqlite3
from flask import current_app
import traceback
from certifi import where
from flask import current_app
from .constant import Constant
from .error import InputError
class Connect():
class Connect:
# 数据库连接类,上下文管理
def __init__(self, file_path='./database/arcaea_database.db'):
def __init__(self, file_path=Constant.SQLITE_DATABASE_PATH):
"""
数据库连接默认连接arcaea_database.db\
接受:文件路径\
@@ -34,15 +39,126 @@ class Connect():
return True
class Sql():
class Query:
'''查询参数类'''
@staticmethod
def select(c, table_name, target_column=[], query=None):
# 执行查询单句sql语句返回fetchall数据
# 使用准确查询,且在单表内
def __init__(self, query_able: list = None, quzzy_query_able: list = None, sort_able: list = None) -> None:
self.query_able: list = query_able
self.quzzy_query_able: list = quzzy_query_able
self.sort_able: list = sort_able
self.__limit: int = -1
self.__offset: int = 0
self.__query: dict = {} # {'name': 'admin'}
self.__fuzzy_query: dict = {} # {'name': 'dmi'}
# [{'column': 'user_id', 'order': 'ASC'}, ...]
self.__sort: list = []
@property
def limit(self) -> int:
return self.__limit
@limit.setter
def limit(self, limit: int) -> None:
if not isinstance(limit, int):
raise InputError(api_error_code=-101)
self.__limit = limit
@property
def offset(self) -> int:
return self.__offset
@offset.setter
def offset(self, offset: int) -> None:
if not isinstance(offset, int):
raise InputError(api_error_code=-101)
self.__offset = offset
@property
def query(self) -> dict:
return self.__query
@query.setter
def query(self, query: dict) -> None:
self.__query = {}
self.query_append(query)
def query_append(self, query: dict) -> None:
if not isinstance(query, dict):
raise InputError(api_error_code=-101)
if self.query_able is not None and query and not set(query).issubset(set(self.query_able)):
raise InputError(api_error_code=-102)
if not self.__query:
self.__query = query
else:
self.__query.update(query)
@property
def fuzzy_query(self) -> dict:
return self.__fuzzy_query
@fuzzy_query.setter
def fuzzy_query(self, fuzzy_query: dict) -> None:
self.__fuzzy_query = {}
self.fuzzy_query_append(fuzzy_query)
def fuzzy_query_append(self, fuzzy_query: dict) -> None:
if not isinstance(fuzzy_query, dict):
raise InputError(api_error_code=-101)
if self.quzzy_query_able is not None and fuzzy_query and not set(fuzzy_query).issubset(set(self.quzzy_query_able)):
raise InputError(api_error_code=-102)
if not self.__fuzzy_query:
self.__fuzzy_query = fuzzy_query
else:
self.__fuzzy_query.update(fuzzy_query)
@property
def sort(self) -> list:
return self.__sort
@sort.setter
def sort(self, sort: list) -> None:
if not isinstance(sort, list):
raise InputError(api_error_code=-101)
if self.sort_able is not None and sort:
for x in sort:
if not isinstance(x, dict):
raise InputError(api_error_code=-101)
if 'column' not in x or x['column'] not in self.sort_able:
raise InputError(api_error_code=-103)
if 'order' not in x:
x['order'] = 'ASC'
else:
if x['order'] not in ['ASC', 'DESC']:
raise InputError(api_error_code=-104)
self.__sort = sort
def set_value(self, limit=-1, offset=0, query={}, fuzzy_query={}, sort=[]) -> None:
self.limit = limit
self.offset = offset
self.query = query
self.fuzzy_query = fuzzy_query
self.sort = sort
def from_data(self, d: dict) -> 'Query':
self.set_value(d.get('limit', -1), d.get('offset', 0),
d.get('query', {}), d.get('fuzzy_query', {}), d.get('sort', []))
return self
class Sql:
'''
数据库增查删改类
'''
def __init__(self, c=None) -> None:
self.c = c
def select(self, table_name: str, target_column: 'list' = [], query: 'Query' = None) -> list:
'''单表内行查询单句sql语句返回fetchall数据'''
sql = 'select '
sql_dict = {}
sql_list = []
if len(target_column) >= 2:
sql += target_column[0]
for i in range(1, len(target_column)):
@@ -53,34 +169,48 @@ class Sql():
else:
sql += '* from ' + table_name
where_field = []
where_value = []
if query:
for i in query.query:
where_field.append(i)
where_value.append(query.query[i])
if query is None:
self.c.execute(sql)
return self.c.fetchall()
if where_field and where_value:
where_key = []
where_like_key = []
for i in query.query:
where_key.append(i)
sql_list.append(query.query[i])
for i in query.fuzzy_query:
where_like_key.append(i)
sql_list.append('%' + query.fuzzy_query[i] + '%')
if where_key or where_like_key:
sql += ' where '
sql += where_field[0] + '=:' + where_field[0]
sql_dict[where_field[0]] = where_value[0]
if len(where_field) >= 2:
for i in range(1, len(where_field)):
sql_dict[where_field[i]] = where_value[i]
sql += ' and ' + where_field[i] + '=:' + where_field[i]
if where_key:
sql += where_key[0] + '=?'
if len(where_key) >= 2:
for i in range(1, len(where_key)):
sql += ' and ' + where_key[i] + '=?'
if where_like_key:
for i in where_like_key:
sql += ' and ' + i + ' like ?'
else:
sql += where_like_key[0] + ' like ?'
if query and query.sort:
if len(where_like_key) >= 2:
for i in range(1, len(where_key)):
sql += ' and ' + where_key[i] + ' like ?'
if query.sort:
sql += ' order by ' + \
query.sort[0]['column'] + ' ' + query.sort[0]['order']
for i in range(1, len(query.sort)):
sql += ', ' + query.sort[i]['column'] + \
' ' + query.sort[i]['order']
if query and query.limit >= 0:
sql += ' limit :limit offset :offset'
sql_dict['limit'] = query.limit
sql_dict['offset'] = query.offset
if query.limit >= 0:
sql += ' limit ? offset ?'
sql_list.append(query.limit)
sql_list.append(query.offset)
c.execute(sql, sql_dict)
return c.fetchall()
self.c.execute(sql, sql_list)
return self.c.fetchall()

View File

@@ -45,12 +45,16 @@ class User:
self.world_rank_score = None
self.ban_flag = None
@property
def hash_pwd(self) -> str:
'''`password`的SHA-256值'''
return hashlib.sha256(self.password.encode("utf8")).hexdigest()
class UserRegister(User):
def __init__(self, c) -> None:
super().__init__()
self.c = c
self.hash_pwd = None
def set_name(self, name: str):
if 3 <= len(name) <= 16:
@@ -67,7 +71,6 @@ class UserRegister(User):
def set_password(self, password: str):
if 8 <= len(password) <= 32:
self.password = password
self.hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
else:
raise InputError('Password is invalid.')
@@ -146,7 +149,6 @@ class UserLogin(User):
self.c = c
self.device_id = None
self.ip = None
self.hash_pwd = None
self.token = None
self.now = 0
@@ -155,7 +157,6 @@ class UserLogin(User):
def set_password(self, password: str):
self.password = password
self.hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
def set_device_id(self, device_id: str):
self.device_id = device_id
@@ -282,7 +283,7 @@ class UserInfo(User):
self.is_skill_sealed = False
self.is_hide_rating = False
self.recent_score = Score()
self.favorite_character = -1
self.favorite_character = None
self.max_stamina_notification_enabled = False
self.prog_boost = 0
@@ -411,7 +412,7 @@ class UserInfo(User):
y = self.c.fetchone()
best_clear_type = y[0] if y is not None else self.recent_score.clear_type
r = self.recent_score.to_dict
r = self.recent_score.to_dict()
r["best_clear_type"] = best_clear_type
return [r]
@@ -440,7 +441,7 @@ class UserInfo(User):
return {
"is_aprilfools": Config.IS_APRILFOOLS,
"curr_available_maps": self.curr_available_maps_list,
"character_stats": [x.to_dict for x in self.characters.characters],
"character_stats": [x.to_dict() for x in self.characters.characters],
"friends": self.friends,
"settings": {
"favorite_character": favorite_character_id,
@@ -473,14 +474,12 @@ class UserInfo(User):
"global_rank": self.global_rank
}
def select_user(self) -> None:
# 查user表所有信息
self.c.execute(
'''select * from user where user_id = :x''', {'x': self.user_id})
x = self.c.fetchone()
def from_list(self, x: list) -> 'UserInfo':
'''从数据库user表全部数据获取信息'''
if not x:
raise NoData('No user.', 108, -3)
return None
if self.user_id is None:
self.user_id = x[0]
self.name = x[1]
self.join_date = int(x[3])
self.user_code = x[4]
@@ -512,6 +511,18 @@ class UserInfo(User):
self.stamina = UserStamina(self.c, self)
self.stamina.set_value(x[32], x[33])
return self
def select_user(self) -> None:
# 查user表所有信息
self.c.execute(
'''select * from user where user_id = :x''', {'x': self.user_id})
x = self.c.fetchone()
if not x:
raise NoData('No user.', 108, -3)
self.from_list(x)
def select_user_about_current_map(self) -> None:
self.c.execute('''select current_map from user where user_id = :a''',
{'a': self.user_id})

View File

@@ -54,7 +54,6 @@ class Step:
self.speed_limit_value: int = None
self.plus_stamina_value: int = None
@property
def to_dict(self) -> dict:
r = {
'position': self.position,
@@ -152,7 +151,7 @@ class Map:
'custom_bg': self.custom_bg,
'stamina_cost': self.stamina_cost,
'step_count': self.step_count,
'steps': [s.to_dict for s in self.steps],
'steps': [s.to_dict() for s in self.steps],
}
def from_dict(self, raw_dict: dict) -> 'Map':
@@ -208,7 +207,6 @@ class UserMap(Map):
return rewards
@property
def rewards_for_climbing_to_dict(self) -> list:
rewards = []
for i in range(self.prev_position, self.curr_position+1):
@@ -441,11 +439,10 @@ class WorldPlay:
self.overdrive_extra: float = None
self.character_bonus_progress: float = None
@property
def to_dict(self) -> dict:
arcmap: 'UserMap' = self.user.current_map
r = {
"rewards": arcmap.rewards_for_climbing_to_dict,
"rewards": arcmap.rewards_for_climbing_to_dict(),
"exp": self.character_used.level.exp,
"level": self.character_used.level.level,
"base_progress": self.base_step_value,
@@ -484,7 +481,7 @@ class WorldPlay:
if self.user_play.beyond_gauge == 0:
r["user_map"]["steps"] = [
x.to_dict for x in arcmap.steps_for_climbing]
x.to_dict() for x in arcmap.steps_for_climbing]
else:
r["user_map"]["steps"] = len(arcmap.steps_for_climbing)