mirror of
https://github.com/Lost-MSth/Arcaea-server.git
synced 2026-02-10 01:37:27 +08:00
[Enhance] [Refactor] Add API about characters
- Add API endpoints about characters - Some small changes in refactoring
This commit is contained in:
@@ -55,8 +55,8 @@ class Role:
|
||||
{'a': self.role_id})
|
||||
x = self.c.fetchone()
|
||||
if x is None:
|
||||
raise NoData('The role `%s` does not exist.' %
|
||||
self.role_id, api_error_code=-200)
|
||||
raise NoData(
|
||||
f'The role `{self.role_id}` does not exist.', api_error_code=-200)
|
||||
self.caption = x[0]
|
||||
return self
|
||||
|
||||
@@ -133,19 +133,19 @@ class APIUser(UserOnline):
|
||||
'a': self.name})
|
||||
x = self.c.fetchone()
|
||||
if x is None:
|
||||
raise NoData('The user `%s` does not exist.' %
|
||||
self.name, api_error_code=-201, status=401)
|
||||
raise NoData(
|
||||
f'The user `{self.name}` does not exist.', api_error_code=-201, status=401)
|
||||
if x[1] == '':
|
||||
raise UserBan('The user `%s` is banned.' % self.name)
|
||||
raise UserBan(f'The user `{self.name}` is banned.')
|
||||
if self.hash_pwd != x[1]:
|
||||
raise NoAccess('The password is incorrect.',
|
||||
api_error_code=-201, status=401)
|
||||
|
||||
self.user_id = x[0]
|
||||
now = int(time() * 1000)
|
||||
self.token = sha256(
|
||||
self.api_token = sha256(
|
||||
(str(self.user_id) + str(now)).encode("utf8") + urandom(8)).hexdigest()
|
||||
|
||||
self.logout()
|
||||
self.c.execute('''insert into api_login values(?,?,?,?)''',
|
||||
(self.user_id, self.token, now, self.ip))
|
||||
(self.user_id, self.api_token, now, self.ip))
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from .config_manager import Config
|
||||
from .constant import Constant
|
||||
from .error import ArcError, InputError, ItemNotEnough, NoData
|
||||
from .item import Item, ItemCore
|
||||
from .item import CollectionItemMixin, ItemCore
|
||||
|
||||
|
||||
class Level:
|
||||
@@ -9,9 +9,9 @@ class Level:
|
||||
min_level = 1
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.max_level = None
|
||||
self.level = None
|
||||
self.exp = None
|
||||
self.max_level: int = None
|
||||
self.level: int = None
|
||||
self.exp: float = None
|
||||
|
||||
@property
|
||||
def level_exp(self):
|
||||
@@ -29,9 +29,9 @@ class Level:
|
||||
|
||||
a = []
|
||||
b = []
|
||||
for i in Constant.LEVEL_STEPS:
|
||||
a.append(i)
|
||||
b.append(Constant.LEVEL_STEPS[i])
|
||||
for k, v in Constant.LEVEL_STEPS.items():
|
||||
a.append(k)
|
||||
b.append(v)
|
||||
|
||||
if exp < b[0]: # 向下溢出,是异常状态,不该被try捕获,不然数据库无法回滚
|
||||
raise ValueError('EXP value error.')
|
||||
@@ -46,23 +46,10 @@ class Level:
|
||||
|
||||
class Skill:
|
||||
def __init__(self) -> None:
|
||||
self.skill_id = None
|
||||
self.skill_id_uncap = None
|
||||
self.skill_unlock_level = None
|
||||
self.skill_requires_uncap = None
|
||||
|
||||
|
||||
class Core(Item):
|
||||
item_type = 'core'
|
||||
|
||||
def __init__(self, core_type: str = '', amount: int = 0) -> None:
|
||||
super().__init__()
|
||||
self.item_id = core_type
|
||||
self.amount = amount
|
||||
self.is_available = True
|
||||
|
||||
def to_dict(self):
|
||||
return {'core_type': self.item_id, 'amount': self.amount}
|
||||
self.skill_id: str = None
|
||||
self.skill_id_uncap: str = None
|
||||
self.skill_unlock_level: int = None
|
||||
self.skill_requires_uncap: bool = None
|
||||
|
||||
|
||||
class CharacterValue:
|
||||
@@ -75,8 +62,7 @@ class CharacterValue:
|
||||
# 4/6859 = 0.00058317539
|
||||
if level <= 10:
|
||||
return 0.00058317539 * (level - 1) ** 3 * (value_20 - value_1) + value_1
|
||||
else:
|
||||
return - 0.00058317539 * (20 - level) ** 3 * (value_20 - value_1) + value_20
|
||||
return - 0.00058317539 * (20 - level) ** 3 * (value_20 - value_1) + value_20
|
||||
|
||||
# @staticmethod
|
||||
# def _calc_char_value_20(level: int, stata, statb, lva=1, lvb=20) -> float:
|
||||
@@ -95,44 +81,51 @@ class CharacterValue:
|
||||
return (level - lva) * (statb - stata) / (lvb - lva) + stata
|
||||
|
||||
def set_parameter(self, start: float = 0, mid: float = 0, end: float = 0):
|
||||
self.start = start
|
||||
self.mid = mid
|
||||
self.end = end
|
||||
self.start: float = start
|
||||
self.mid: float = mid
|
||||
self.end: float = end
|
||||
|
||||
def get_value(self, level: Level):
|
||||
if level.min_level <= level.level <= level.mid_level:
|
||||
return self._calc_char_value_20_math(level.level, self.start, self.mid)
|
||||
elif level.mid_level < level.level <= level.max_level:
|
||||
if level.mid_level < level.level <= level.max_level:
|
||||
return self._calc_char_value_30(level.level, self.mid, self.end)
|
||||
else:
|
||||
return 0
|
||||
return 0
|
||||
|
||||
|
||||
class Character:
|
||||
class Character(CollectionItemMixin):
|
||||
database_table_name = None
|
||||
|
||||
collection_item_const = {
|
||||
'name': 'character',
|
||||
'table_name': 'char_item',
|
||||
'table_primary_key': 'character_id',
|
||||
'id_name': 'character_id',
|
||||
'items_name': 'uncap_cores'
|
||||
}
|
||||
|
||||
def __init__(self, c=None) -> None:
|
||||
self.c = c
|
||||
|
||||
self.character_id = None
|
||||
self.name = None
|
||||
self.char_type = None
|
||||
self.is_uncapped = None
|
||||
self.is_uncapped_override = None
|
||||
self.character_id: int = None
|
||||
self.name: str = None
|
||||
self.char_type: int = None
|
||||
self.is_uncapped: bool = None
|
||||
self.is_uncapped_override: bool = None
|
||||
self.skill = Skill()
|
||||
self.level = Level()
|
||||
self.frag = CharacterValue()
|
||||
self.prog = CharacterValue()
|
||||
self.overdrive = CharacterValue()
|
||||
self.uncap_cores = []
|
||||
self.voice = None
|
||||
self.uncap_cores: list = []
|
||||
self.voice: list = None
|
||||
|
||||
@property
|
||||
def skill_id_displayed(self) -> str:
|
||||
return None
|
||||
|
||||
def uncap_cores_to_dict(self):
|
||||
return [x.to_dict() for x in self.uncap_cores]
|
||||
return [x.to_dict(character_format=True) for x in self.uncap_cores]
|
||||
|
||||
@property
|
||||
def is_uncapped_displayed(self) -> bool:
|
||||
@@ -144,11 +137,71 @@ class Character:
|
||||
# 应该是只有对立这样
|
||||
return self.character_id == 1
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
pass
|
||||
def to_dict(self, has_cores=False) -> dict:
|
||||
# 用于API里,游戏内的数据不太一样,故不能super
|
||||
r = {
|
||||
'character_id': self.character_id,
|
||||
'name': self.name,
|
||||
'char_type': self.char_type,
|
||||
'is_uncapped': self.is_uncapped,
|
||||
'max_level': self.level.max_level,
|
||||
'skill_id': self.skill.skill_id,
|
||||
'skill_unlock_level': self.skill.skill_unlock_level,
|
||||
'skill_requires_uncap': self.skill.skill_requires_uncap,
|
||||
'skill_id_uncap': self.skill.skill_id_uncap,
|
||||
'frag1': self.frag.start,
|
||||
'frag20': self.frag.mid,
|
||||
'frag30': self.frag.end,
|
||||
'prog1': self.prog.start,
|
||||
'prog20': self.prog.mid,
|
||||
'prog30': self.prog.end,
|
||||
'overdrive1': self.overdrive.start,
|
||||
'overdrive20': self.overdrive.mid,
|
||||
'overdrive30': self.overdrive.end,
|
||||
}
|
||||
if has_cores:
|
||||
r['uncap_cores'] = self.uncap_cores_to_dict()
|
||||
return r
|
||||
|
||||
def from_list(self, l: list) -> 'Character':
|
||||
pass
|
||||
def from_list(self, l: tuple) -> 'Character':
|
||||
self.character_id = l[0]
|
||||
self.name = l[1]
|
||||
self.level.max_level = l[2]
|
||||
self.frag.set_parameter(l[3], l[6], l[9])
|
||||
self.prog.set_parameter(l[4], l[7], l[10])
|
||||
self.overdrive.set_parameter(l[5], l[8], l[11])
|
||||
self.skill.skill_id = l[12]
|
||||
self.skill.skill_unlock_level = l[13]
|
||||
self.skill.skill_requires_uncap = l[14] == 1
|
||||
self.skill.skill_id_uncap = l[15]
|
||||
self.char_type = l[16]
|
||||
self.is_uncapped = l[17] == 1
|
||||
return self
|
||||
|
||||
def select(self, character_id: int = None) -> 'Character':
|
||||
if character_id is not None:
|
||||
self.character_id = character_id
|
||||
self.c.execute(
|
||||
'select * from character where character_id = ?', (self.character_id,))
|
||||
x = self.c.fetchone()
|
||||
if not x:
|
||||
raise NoData(
|
||||
f'No such character: {self.character_id}', api_error_code=-130)
|
||||
return self.from_list(x)
|
||||
|
||||
def update(self) -> None:
|
||||
self.c.execute('''update character set name = ?, max_level = ?, frag1 = ?, frag20 = ?, frag30 = ?, prog1 = ?, prog20 = ?, prog30 = ?, overdrive1 = ?, overdrive20 = ?, overdrive30 = ?, skill_id = ?, skill_unlock_level = ?, skill_requires_uncap = ?, skill_id_uncap = ?, char_type = ?, is_uncapped = ? where character_id = ?''', (
|
||||
self.name, self.level.max_level, self.frag.start, self.frag.mid, self.frag.end, self.prog.start, self.prog.mid, self.prog.end, self.overdrive.start, self.overdrive.mid, self.overdrive.end, self.skill.skill_id, self.skill.skill_unlock_level, self.skill.skill_requires_uncap, self.skill.skill_id_uncap, self.char_type, self.is_uncapped, self.character_id))
|
||||
|
||||
def select_character_core(self):
|
||||
# 获取此角色所需核心
|
||||
self.c.execute(
|
||||
'''select item_id, amount from char_item where character_id = ? and type="core"''', (self.character_id,))
|
||||
x = self.c.fetchall()
|
||||
if x:
|
||||
self.uncap_cores = []
|
||||
for i in x:
|
||||
self.uncap_cores.append(ItemCore(self.c, i[0], i[1]))
|
||||
|
||||
|
||||
class UserCharacter(Character):
|
||||
@@ -169,27 +222,16 @@ class UserCharacter(Character):
|
||||
'''对外显示的技能id'''
|
||||
if self.is_uncapped_displayed and self.skill.skill_id_uncap:
|
||||
return self.skill.skill_id_uncap
|
||||
elif self.skill.skill_id and self.level.level >= self.skill.skill_unlock_level:
|
||||
if self.skill.skill_id and self.level.level >= self.skill.skill_unlock_level:
|
||||
return self.skill.skill_id
|
||||
else:
|
||||
return None
|
||||
|
||||
def select_character_core(self):
|
||||
# 获取此角色所需核心
|
||||
self.c.execute(
|
||||
'''select item_id, amount from char_item where character_id = ? and type="core"''', (self.character_id,))
|
||||
x = self.c.fetchall()
|
||||
if x:
|
||||
self.uncap_cores = []
|
||||
for i in x:
|
||||
self.uncap_cores.append(Core(i[0], i[1]))
|
||||
return None
|
||||
|
||||
def select_character_uncap_condition(self, user=None):
|
||||
# parameter: user - User类或子类的实例
|
||||
# 获取此角色的觉醒信息
|
||||
if user:
|
||||
self.user = user
|
||||
self.c.execute('''select is_uncapped, is_uncapped_override from %s where user_id = :a and character_id = :b''' % self.database_table_name,
|
||||
self.c.execute(f'''select is_uncapped, is_uncapped_override from {self.database_table_name} where user_id = :a and character_id = :b''',
|
||||
{'a': self.user.user_id, 'b': self.character_id})
|
||||
|
||||
x = self.c.fetchone()
|
||||
@@ -206,7 +248,7 @@ class UserCharacter(Character):
|
||||
# 获取所给用户此角色信息
|
||||
if user:
|
||||
self.user = user
|
||||
self.c.execute('''select * from %s a,character b where a.user_id=? and a.character_id=b.character_id and a.character_id=?''' % self.database_table_name,
|
||||
self.c.execute(f'''select * from {self.database_table_name} a,character b where a.user_id=? and a.character_id=b.character_id and a.character_id=?''',
|
||||
(self.user.user_id, self.character_id))
|
||||
|
||||
y = self.c.fetchone()
|
||||
@@ -228,7 +270,7 @@ class UserCharacter(Character):
|
||||
self.skill.skill_unlock_level = y[19]
|
||||
self.skill.skill_requires_uncap = y[20] == 1
|
||||
|
||||
if self.character_id == 21 or self.character_id == 46:
|
||||
if self.character_id in (21, 46):
|
||||
self.voice = [0, 1, 2, 3, 100, 1000, 1001]
|
||||
|
||||
self.select_character_core()
|
||||
@@ -267,7 +309,7 @@ class UserCharacter(Character):
|
||||
# 切换觉醒状态
|
||||
if user:
|
||||
self.user = user
|
||||
self.c.execute('''select is_uncapped, is_uncapped_override from %s where user_id = :a and character_id = :b''' % self.database_table_name,
|
||||
self.c.execute(f'''select is_uncapped, is_uncapped_override from {self.database_table_name} where user_id = :a and character_id = :b''',
|
||||
{'a': self.user.user_id, 'b': self.character_id})
|
||||
|
||||
x = self.c.fetchone()
|
||||
@@ -277,7 +319,7 @@ class UserCharacter(Character):
|
||||
self.c.execute('''update user set is_char_uncapped_override = :a where user_id = :b''', {
|
||||
'a': 1 if x[1] == 0 else 0, 'b': self.user.user_id})
|
||||
|
||||
self.c.execute('''update %s set is_uncapped_override = :a where user_id = :b and character_id = :c''' % self.database_table_name, {
|
||||
self.c.execute(f'''update {self.database_table_name} set is_uncapped_override = :a where user_id = :b and character_id = :c''', {
|
||||
'a': 1 if x[1] == 0 else 0, 'b': self.user.user_id, 'c': self.character_id})
|
||||
|
||||
self.is_uncapped_override = x[1] == 0
|
||||
@@ -311,7 +353,7 @@ class UserCharacter(Character):
|
||||
raise ItemNotEnough('The cores are not enough.')
|
||||
|
||||
for i in self.uncap_cores:
|
||||
ItemCore(self.c, i, reverse=True).user_claim_item(self.user)
|
||||
i.user_claim_item(self.user, reverse=True)
|
||||
|
||||
self.c.execute('''update user_char set is_uncapped=1, is_uncapped_override=0 where user_id=? and character_id=?''',
|
||||
(self.user.user_id, self.character_id))
|
||||
@@ -382,7 +424,7 @@ class UserCharacterList:
|
||||
|
||||
def select_user_characters(self):
|
||||
self.c.execute(
|
||||
'''select character_id from %s where user_id=?''' % self.database_table_name, (self.user.user_id,))
|
||||
f'''select character_id from {self.database_table_name} where user_id=?''', (self.user.user_id,))
|
||||
x = self.c.fetchall()
|
||||
self.characters: list = []
|
||||
if x:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from .config_manager import Config
|
||||
|
||||
ARCAEA_SERVER_VERSION = 'v2.11.0'
|
||||
ARCAEA_SERVER_VERSION = 'v2.11.0.1'
|
||||
|
||||
|
||||
class Constant:
|
||||
|
||||
@@ -96,7 +96,7 @@ class Course:
|
||||
'''select * from course where course_id = ?''', (self.course_id,))
|
||||
x = self.c.fetchone()
|
||||
if x is None:
|
||||
raise NoData('The course `%s` is not found.' % self.course_id)
|
||||
raise NoData(f'The course `{self.course_id}` is not found.')
|
||||
return self.from_list(x)
|
||||
|
||||
def select_course_chart(self) -> None:
|
||||
@@ -151,7 +151,8 @@ class Course:
|
||||
|
||||
class UserCourse(Course):
|
||||
'''
|
||||
用户课题类\
|
||||
用户课题类
|
||||
|
||||
parameter: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -200,7 +201,8 @@ class UserCourse(Course):
|
||||
|
||||
class UserCourseList:
|
||||
'''
|
||||
用户课题列表类\
|
||||
用户课题列表类
|
||||
|
||||
parameter: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -237,8 +239,9 @@ class UserCourseList:
|
||||
|
||||
class CoursePlay(UserCourse):
|
||||
'''
|
||||
课题模式打歌类,联动UserPlay\
|
||||
parameter: `user` - `UserOnline`类或子类的实例\
|
||||
课题模式打歌类,联动UserPlay
|
||||
|
||||
parameter: `user` - `UserOnline`类或子类的实例
|
||||
'user_play` - `UserPlay`类的实例
|
||||
'''
|
||||
|
||||
|
||||
@@ -88,7 +88,8 @@ class SonglistParser:
|
||||
|
||||
class UserDownload:
|
||||
'''
|
||||
用户下载类\
|
||||
用户下载类
|
||||
|
||||
properties: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -158,8 +159,7 @@ class UserDownload:
|
||||
if prefix[-1] != '/':
|
||||
prefix += '/'
|
||||
return f'{prefix}{self.song_id}/{self.file_name}?t={self.token}'
|
||||
else:
|
||||
return url_for('download', file_path=f'{self.song_id}/{self.file_name}', t=self.token, _external=True)
|
||||
return url_for('download', file_path=f'{self.song_id}/{self.file_name}', t=self.token, _external=True)
|
||||
|
||||
@property
|
||||
def hash(self) -> str:
|
||||
@@ -243,7 +243,7 @@ class DownloadList(UserDownload):
|
||||
re['audio']['3'] = {"checksum": x.hash, "url": x.url}
|
||||
else:
|
||||
re['audio']['3'] = {"checksum": x.hash}
|
||||
elif i == 'video.mp4' or i == 'video_audio.ogg':
|
||||
elif i in ('video.mp4', 'video_audio.ogg'):
|
||||
if 'additional_files' not in re:
|
||||
re['additional_files'] = []
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ class DatabaseInit:
|
||||
|
||||
def table_init(self) -> None:
|
||||
'''初始化数据库结构'''
|
||||
with open(self.sql_path, 'r') as f:
|
||||
with open(self.sql_path, 'r', encoding='utf-8') as f:
|
||||
self.c.executescript(f.read())
|
||||
self.c.execute('''insert into config values("version", :a);''', {
|
||||
'a': ARCAEA_SERVER_VERSION})
|
||||
@@ -129,7 +129,7 @@ class DatabaseInit:
|
||||
|
||||
x._insert_user_char()
|
||||
|
||||
self.c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
|
||||
self.c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
|
||||
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled, current_map, ticket, prog_boost, email)
|
||||
values(:user_id, :name, :password, :join_date, :user_code, 0, 0, 0, 0, 0, 0, -1, 0, '', :memories, 0, :email)
|
||||
''', {'user_code': x.user_code, 'user_id': x.user_id, 'join_date': now, 'name': x.name, 'password': '41e5653fc7aeb894026d6bb7b2db7f65902b454945fa8fd65a6327047b5277fb', 'memories': 114514, 'email': x.email})
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from .config_manager import Config
|
||||
from .error import InputError, ItemNotEnough, ItemUnavailable, NoData
|
||||
from .error import DataExist, InputError, ItemNotEnough, ItemUnavailable, NoData
|
||||
|
||||
|
||||
class Item:
|
||||
@@ -73,7 +73,8 @@ class UserItem(Item):
|
||||
|
||||
def select_user_item(self, user=None):
|
||||
'''
|
||||
查询用户item\
|
||||
查询用户item
|
||||
|
||||
parameter: `user` - `User`类或子类的实例
|
||||
'''
|
||||
if user is not None:
|
||||
@@ -102,8 +103,7 @@ class NormalItem(UserItem):
|
||||
if x[0] == 0:
|
||||
self.is_available = False
|
||||
raise ItemUnavailable('The item is unavailable.')
|
||||
else:
|
||||
self.is_available = True
|
||||
self.is_available = True
|
||||
else:
|
||||
raise NoData('No item data.')
|
||||
|
||||
@@ -142,16 +142,29 @@ class PositiveItem(UserItem):
|
||||
class ItemCore(PositiveItem):
|
||||
item_type = 'core'
|
||||
|
||||
def __init__(self, c, core=None, reverse=False) -> None:
|
||||
def __init__(self, c, core_type: str = '', amount: int = 0) -> None:
|
||||
super().__init__(c)
|
||||
self.is_available = True
|
||||
if core:
|
||||
self.item_id = core.item_id
|
||||
self.amount = - core.amount if reverse else core.amount
|
||||
self.item_id = core_type
|
||||
self.amount = amount
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.item_id + '_' + str(self.amount)
|
||||
|
||||
def to_dict(self, has_is_available: bool = False, has_amount: bool = True, character_format: bool = False) -> dict:
|
||||
if character_format:
|
||||
# 搭档的core是特殊格式的
|
||||
return {'core_type': self.item_id, 'amount': self.amount}
|
||||
return super().to_dict(has_is_available=has_is_available, has_amount=has_amount)
|
||||
|
||||
def user_claim_item(self, user, reverse: bool = False) -> None:
|
||||
# 骚操作,将amount变为负数后使用再变回来
|
||||
if reverse:
|
||||
self.amount = -self.amount
|
||||
super().user_claim_item(user)
|
||||
if reverse:
|
||||
self.amount = -self.amount
|
||||
|
||||
|
||||
class ItemCharacter(UserItem):
|
||||
item_type = 'character'
|
||||
@@ -257,16 +270,10 @@ class CourseBanner(NormalItem):
|
||||
class Single(NormalItem):
|
||||
item_type = 'single'
|
||||
|
||||
def __init__(self, c) -> None:
|
||||
super().__init__(c)
|
||||
|
||||
|
||||
class Pack(NormalItem):
|
||||
item_type = 'pack'
|
||||
|
||||
def __init__(self, c) -> None:
|
||||
super().__init__(c)
|
||||
|
||||
|
||||
class ProgBoost(UserItem):
|
||||
item_type = 'prog_boost_300'
|
||||
@@ -276,7 +283,8 @@ class ProgBoost(UserItem):
|
||||
|
||||
def user_claim_item(self, user):
|
||||
'''
|
||||
世界模式prog_boost\
|
||||
世界模式prog_boost
|
||||
|
||||
parameters: `user` - `UserOnline`类或子类的实例
|
||||
'''
|
||||
user.update_user_one_column('prog_boost', 300)
|
||||
@@ -290,7 +298,7 @@ class Stamina6(UserItem):
|
||||
|
||||
def user_claim_item(self, user):
|
||||
'''
|
||||
世界模式记忆源点或残片买体力+6\
|
||||
世界模式记忆源点或残片买体力+6
|
||||
顺手清一下世界模式过载状态
|
||||
'''
|
||||
user.select_user_about_stamina()
|
||||
@@ -386,8 +394,8 @@ class ItemFactory:
|
||||
|
||||
class UserItemList:
|
||||
'''
|
||||
用户的item列表\
|
||||
注意只能查在user_item里面的,character不行\
|
||||
用户的item列表
|
||||
注意只能查在user_item里面的,character不行
|
||||
properties: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -420,3 +428,63 @@ class UserItemList:
|
||||
self.items.append(ItemFactory.from_dict(
|
||||
{'item_id': i[0], 'amount': amount, 'item_type': item_type}, self.c))
|
||||
return self
|
||||
|
||||
|
||||
class CollectionItemMixin:
|
||||
'''
|
||||
批量修改一些集合中的items
|
||||
'''
|
||||
collection_item_const = {
|
||||
'name': 'collection',
|
||||
'table_name': 'collection_item',
|
||||
'table_primary_key': 'collection_id',
|
||||
'id_name': 'collection_id',
|
||||
'items_name': 'items'
|
||||
}
|
||||
|
||||
def add_items(self, items: 'list[Item]') -> None:
|
||||
collection_id: 'str' = getattr(
|
||||
self, self.collection_item_const['id_name'])
|
||||
collection_items: 'list[Item]' = getattr(
|
||||
self, self.collection_item_const['items_name'])
|
||||
|
||||
for i in items:
|
||||
if not i.select_exists():
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
|
||||
if i in collection_items:
|
||||
raise DataExist(
|
||||
f'Item `{i.item_type}`: `{i.item_id}` already exists in {self.collection_item_const["name"]} `{collection_id}`', api_error_code=-123)
|
||||
self.c.executemany(f'''insert into {self.collection_item_const["table_name"]} values (?, ?, ?, ?)''', [
|
||||
(collection_id, i.item_id, i.item_type, i.amount) for i in items])
|
||||
collection_items.extend(items)
|
||||
|
||||
def remove_items(self, items: 'list[Item]') -> None:
|
||||
collection_id: 'str' = getattr(
|
||||
self, self.collection_item_const['id_name'])
|
||||
collection_items: 'list[Item]' = getattr(
|
||||
self, self.collection_item_const['items_name'])
|
||||
|
||||
for i in items:
|
||||
if i not in collection_items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in {self.collection_item_const["name"]} `{collection_id}`', api_error_code=-124)
|
||||
self.c.executemany(f'''delete from {self.collection_item_const["table_name"]} where {self.collection_item_const["table_primary_key"]}=? and item_id=? and type=?''', [
|
||||
(collection_id, i.item_id, i.item_type) for i in items])
|
||||
for i in items:
|
||||
collection_items.remove(i)
|
||||
|
||||
def update_items(self, items: 'list[Item]') -> None:
|
||||
collection_id: 'str' = getattr(
|
||||
self, self.collection_item_const['id_name'])
|
||||
collection_items: 'list[Item]' = getattr(
|
||||
self, self.collection_item_const['items_name'])
|
||||
|
||||
for i in items:
|
||||
if i not in collection_items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in {self.collection_item_const["name"]} `{collection_id}`', api_error_code=-124)
|
||||
self.c.executemany(f'''update {self.collection_item_const["table_name"]} set amount=? where {self.collection_item_const["table_primary_key"]}=? and item_id=? and type=?''', [
|
||||
(i.amount, collection_id, i.item_id, i.item_type) for i in items])
|
||||
for i in items:
|
||||
collection_items[collection_items.index(i)].amount = i.amount
|
||||
|
||||
@@ -103,9 +103,9 @@ class RemoteMultiPlayer:
|
||||
sock.sendall(bytes(data + "\n", "utf-8"))
|
||||
try:
|
||||
received = str(sock.recv(1024), "utf-8").strip()
|
||||
except socket.timeout:
|
||||
except socket.timeout as e:
|
||||
raise Timeout(
|
||||
'Timeout when waiting for data from link play server.', status=400)
|
||||
'Timeout when waiting for data from link play server.', status=400) from e
|
||||
# print(received)
|
||||
return received
|
||||
|
||||
|
||||
@@ -53,8 +53,7 @@ class RefreshAllScoreRating(BaseOperation):
|
||||
where_values = []
|
||||
for k in y:
|
||||
ptt = Score.calculate_rating(defnum, k[1])
|
||||
if ptt < 0:
|
||||
ptt = 0
|
||||
ptt = max(ptt, 0)
|
||||
values.append((ptt,))
|
||||
where_values.append((k[0], i[0], j))
|
||||
if values:
|
||||
@@ -76,7 +75,7 @@ class RefreshSongFileCache(BaseOperation):
|
||||
|
||||
class SaveUpdateScore(BaseOperation):
|
||||
'''
|
||||
云存档更新成绩,是覆盖式更新\
|
||||
云存档更新成绩,是覆盖式更新
|
||||
提供user参数时,只更新该用户的成绩,否则更新所有用户的成绩
|
||||
'''
|
||||
_name = 'save_update_score'
|
||||
@@ -125,8 +124,7 @@ class SaveUpdateScore(BaseOperation):
|
||||
if i['song_id'] in song_chart_const:
|
||||
rating = Score.calculate_rating(
|
||||
song_chart_const[i['song_id']][i['difficulty']] / 10, i['score'])
|
||||
if rating < 0:
|
||||
rating = 0
|
||||
rating = max(rating, 0)
|
||||
|
||||
y = f'{i["song_id"]}{i["difficulty"]}'
|
||||
if y in clear_state:
|
||||
@@ -143,7 +141,7 @@ class SaveUpdateScore(BaseOperation):
|
||||
def _all_update(self):
|
||||
with Connect() as c:
|
||||
c.execute(
|
||||
f'''select song_id, rating_pst, rating_prs, rating_ftr, rating_byn from chart''')
|
||||
'''select song_id, rating_pst, rating_prs, rating_ftr, rating_byn from chart''')
|
||||
song_chart_const = {i[0]: [i[1], i[2], i[3], i[4]]
|
||||
for i in c.fetchall()} # chart const * 10
|
||||
c.execute('''select user_id from user_save''')
|
||||
@@ -162,8 +160,7 @@ class SaveUpdateScore(BaseOperation):
|
||||
if i['song_id'] in song_chart_const:
|
||||
rating = Score.calculate_rating(
|
||||
song_chart_const[i['song_id']][i['difficulty']] / 10, i['score'])
|
||||
if rating < 0:
|
||||
rating = 0
|
||||
rating = max(rating, 0)
|
||||
|
||||
y = f'{i["song_id"]}{i["difficulty"]}'
|
||||
if y in clear_state:
|
||||
@@ -180,7 +177,7 @@ class SaveUpdateScore(BaseOperation):
|
||||
|
||||
class UnlockUserItem(BaseOperation):
|
||||
'''
|
||||
全解锁/锁定用户物品\
|
||||
全解锁/锁定用户物品
|
||||
提供user参数时,只更新该用户的,否则更新所有用户的
|
||||
'''
|
||||
_name = 'unlock_user_item'
|
||||
@@ -198,7 +195,7 @@ class UnlockUserItem(BaseOperation):
|
||||
self.user.user_id = int(user_id)
|
||||
if method in ['unlock', 'lock']:
|
||||
self.method = method
|
||||
if isinstance(item_types, list) and all([i in self.ALLOW_TYPES for i in item_types]):
|
||||
if isinstance(item_types, list) and all(i in self.ALLOW_TYPES for i in item_types):
|
||||
self.item_types = item_types
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -1,10 +1,18 @@
|
||||
from time import time
|
||||
|
||||
from .error import ArcError, DataExist, NoData
|
||||
from .item import ItemFactory
|
||||
from .error import ArcError, NoData
|
||||
from .item import CollectionItemMixin, ItemFactory
|
||||
|
||||
|
||||
class Present:
|
||||
class Present(CollectionItemMixin):
|
||||
collection_item_const = {
|
||||
'name': 'present',
|
||||
'table_name': 'present_item',
|
||||
'table_primary_key': 'present_id',
|
||||
'id_name': 'present_id',
|
||||
'items_name': 'items'
|
||||
}
|
||||
|
||||
def __init__(self, c=None) -> None:
|
||||
self.c = c
|
||||
self.present_id: str = None
|
||||
@@ -109,47 +117,11 @@ class Present:
|
||||
self.c.execute('''update present set expire_ts=?, description=? where present_id=?''',
|
||||
(self.expire_ts, self.description, self.present_id))
|
||||
|
||||
def remove_items(self, items: list) -> None:
|
||||
'''删除present_item表中的物品'''
|
||||
for i in items:
|
||||
if i not in self.items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in present `{self.present_id}`', api_error_code=-124)
|
||||
self.c.executemany('''delete from present_item where present_id=? and item_id=? and type=?''', [
|
||||
(self.present_id, i.item_id, i.item_type) for i in items])
|
||||
for i in items:
|
||||
self.items.remove(i)
|
||||
|
||||
def add_items(self, items: list) -> None:
|
||||
'''添加物品到present_item表'''
|
||||
for i in items:
|
||||
if not i.select_exists():
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
|
||||
if i in self.items:
|
||||
raise DataExist(
|
||||
f'Item `{i.item_type}`: `{i.item_id}` already exists in present `{self.present_id}`', api_error_code=-123)
|
||||
self.c.executemany('''insert into present_item values(?,?,?,?)''', [
|
||||
(self.present_id, i.item_id, i.item_type, i.amount) for i in items])
|
||||
self.items.extend(items)
|
||||
|
||||
def update_items(self, items: list) -> None:
|
||||
'''更新present_item表中的物品'''
|
||||
for i in items:
|
||||
if i not in self.items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in present `{self.present_id}`', api_error_code=-124)
|
||||
self.c.executemany('''update present_item set amount=? where present_id=? and item_id=? and type=?''', [
|
||||
(i.amount, self.present_id, i.item_id, i.item_type) for i in items])
|
||||
|
||||
for i in items:
|
||||
self.items[self.items.index(i)].amount = i.amount
|
||||
|
||||
|
||||
class UserPresent(Present):
|
||||
'''
|
||||
用户登录奖励类\
|
||||
忽视了description的多语言\
|
||||
用户登录奖励类
|
||||
忽视了description的多语言
|
||||
properties: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
from time import time
|
||||
|
||||
from .error import DataExist, InputError, NoData, TicketNotEnough
|
||||
from .item import ItemFactory
|
||||
from .error import InputError, NoData, TicketNotEnough
|
||||
from .item import CollectionItemMixin, ItemFactory
|
||||
|
||||
|
||||
class Purchase:
|
||||
class Purchase(CollectionItemMixin):
|
||||
'''
|
||||
购买类\
|
||||
购买类
|
||||
properties: `user` - `User`类或子类的实例
|
||||
'''
|
||||
collection_item_const = {
|
||||
'name': 'purchase',
|
||||
'table_name': 'purchase_item',
|
||||
'table_primary_key': 'purchase_name',
|
||||
'id_name': 'purchase_name',
|
||||
'items_name': 'items'
|
||||
}
|
||||
|
||||
def __init__(self, c=None, user=None):
|
||||
self.c = c
|
||||
@@ -212,45 +219,11 @@ class Purchase:
|
||||
self.c.execute('''update purchase set price=:a, orig_price=:b, discount_from=:c, discount_to=:d, discount_reason=:e where purchase_name=:f''', {
|
||||
'a': self.price, 'b': self.orig_price, 'c': self.discount_from, 'd': self.discount_to, 'e': self.discount_reason, 'f': self.purchase_name})
|
||||
|
||||
def add_items(self, items: list) -> None:
|
||||
'''添加purchase_item表'''
|
||||
for i in items:
|
||||
if not i.select_exists():
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
|
||||
if i in self.items:
|
||||
raise DataExist(
|
||||
f'Item `{i.item_type}`: `{i.item_id}` already exists in purchase `{self.purchase_name}`', api_error_code=-123)
|
||||
self.c.executemany('''insert into purchase_item values (?, ?, ?, ?)''', [
|
||||
(self.purchase_name, i.item_id, i.item_type, i.amount) for i in items])
|
||||
self.items.extend(items)
|
||||
|
||||
def remove_items(self, items: list) -> None:
|
||||
'''删除purchase_item表'''
|
||||
for i in items:
|
||||
if i not in self.items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in purchase `{self.purchase_name}`', api_error_code=-124)
|
||||
self.c.executemany('''delete from purchase_item where purchase_name=? and item_id=? and type=?''', [
|
||||
(self.purchase_name, i.item_id, i.item_type) for i in items])
|
||||
for i in items:
|
||||
self.items.remove(i)
|
||||
|
||||
def update_items(self, items: list) -> None:
|
||||
'''更新purchase_item表,只能更新amount'''
|
||||
for i in items:
|
||||
if i not in self.items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in purchase `{self.purchase_name}`', api_error_code=-124)
|
||||
self.c.executemany('''update purchase_item set amount=? where purchase_name=? and item_id=? and type=?''', [
|
||||
(i.amount, self.purchase_name, i.item_id, i.item_type) for i in items])
|
||||
for i in items:
|
||||
self.items[self.items.index(i)].amount = i.amount
|
||||
|
||||
|
||||
class PurchaseList:
|
||||
'''
|
||||
购买列表类\
|
||||
购买列表类
|
||||
|
||||
property: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
|
||||
@@ -7,8 +7,9 @@ from .user import UserInfo
|
||||
|
||||
class RankList:
|
||||
'''
|
||||
排行榜类\
|
||||
默认limit=20,limit<0认为是all\
|
||||
排行榜类
|
||||
默认limit=20,limit<0认为是all
|
||||
|
||||
property: `user` - `User`类或者子类的实例
|
||||
'''
|
||||
|
||||
@@ -85,7 +86,8 @@ class RankList:
|
||||
@staticmethod
|
||||
def get_my_rank_parameter(my_rank: int, amount: int, all_limit: int = 20, max_local_position: int = Constant.MY_RANK_MAX_LOCAL_POSITION, max_global_position: int = Constant.MY_RANK_MAX_GLOBAL_POSITION):
|
||||
'''
|
||||
计算我的排名中的查询参数\
|
||||
计算我的排名中的查询参数
|
||||
|
||||
returns:
|
||||
`sql_limit`: int - 查询limit参数
|
||||
`sql_offset`: int - 查询offset参数
|
||||
@@ -103,7 +105,7 @@ class RankList:
|
||||
need_myself = True
|
||||
elif amount - my_rank < all_limit - max_local_position: # 后方人数不足,显示排名
|
||||
sql_offset = amount - all_limit
|
||||
elif my_rank >= max_local_position and my_rank <= max_global_position - all_limit + max_local_position - 1: # 前方人数足够,显示排名
|
||||
elif max_local_position <= my_rank <= max_global_position - all_limit + max_local_position - 1: # 前方人数足够,显示排名
|
||||
sql_offset = my_rank - max_local_position
|
||||
else: # 我已经忘了这是什么了
|
||||
sql_offset = max_global_position - all_limit
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
from .error import DataExist, NoData, RedeemUnavailable
|
||||
from .item import ItemFactory
|
||||
from .error import NoData, RedeemUnavailable
|
||||
from .item import CollectionItemMixin, ItemFactory
|
||||
|
||||
|
||||
class Redeem:
|
||||
class Redeem(CollectionItemMixin):
|
||||
collection_item_const = {
|
||||
'name': 'redeem',
|
||||
'table_name': 'redeem_item',
|
||||
'table_primary_key': 'code',
|
||||
'id_name': 'code',
|
||||
'items_name': 'items'
|
||||
}
|
||||
|
||||
def __init__(self, c=None) -> None:
|
||||
self.c = c
|
||||
self.code: str = None
|
||||
@@ -87,46 +95,11 @@ class Redeem:
|
||||
self.c.execute('''update redeem set type=? where code=?''',
|
||||
(self.redeem_type, self.code))
|
||||
|
||||
def remove_items(self, items: list) -> None:
|
||||
'''删除redeem_item表中的物品'''
|
||||
for i in items:
|
||||
if i not in self.items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in redeem `{self.code}`', api_error_code=-124)
|
||||
self.c.executemany('''delete from redeem_item where code=? and item_id=? and type=?''', [
|
||||
(self.code, i.item_id, i.item_type) for i in items])
|
||||
for i in items:
|
||||
self.items.remove(i)
|
||||
|
||||
def add_items(self, items: list) -> None:
|
||||
'''添加物品到redeem_item表'''
|
||||
for i in items:
|
||||
if not i.select_exists():
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
|
||||
if i in self.items:
|
||||
raise DataExist(
|
||||
f'Item `{i.item_type}`: `{i.item_id}` already exists in redeem `{self.code}`', api_error_code=-123)
|
||||
self.c.executemany('''insert into redeem_item values(?,?,?,?)''', [
|
||||
(self.code, i.item_id, i.item_type, i.amount) for i in items])
|
||||
self.items.extend(items)
|
||||
|
||||
def update_items(self, items: list) -> None:
|
||||
'''更新redeem_item表中的物品'''
|
||||
for i in items:
|
||||
if i not in self.items:
|
||||
raise NoData(
|
||||
f'No such item `{i.item_type}`: `{i.item_id}` in redeem `{self.code}`', api_error_code=-124)
|
||||
self.c.executemany('''update redeem_item set amount=? where code=? and item_id=? and type=?''', [
|
||||
(i.amount, self.code, i.item_id, i.item_type) for i in items])
|
||||
|
||||
for i in items:
|
||||
self.items[self.items.index(i)].amount = i.amount
|
||||
|
||||
|
||||
class UserRedeem(Redeem):
|
||||
'''
|
||||
用户兑换码类\
|
||||
用户兑换码类
|
||||
|
||||
properties: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -165,7 +138,7 @@ class UserRedeem(Redeem):
|
||||
if self.redeem_type == 0:
|
||||
raise RedeemUnavailable(
|
||||
'The redeem `%s` is unavailable.' % self.code)
|
||||
elif self.redeem_type == 1:
|
||||
if self.redeem_type == 1:
|
||||
raise RedeemUnavailable(
|
||||
'The redeem `%s` is unavailable.' % self.code, 506)
|
||||
|
||||
|
||||
@@ -11,6 +11,8 @@ class SaveData:
|
||||
|
||||
def __init__(self, c=None) -> None:
|
||||
self.c = c
|
||||
self.user = None
|
||||
|
||||
self.scores_data = []
|
||||
self.clearlamps_data = []
|
||||
self.clearedsongs_data = []
|
||||
@@ -127,7 +129,7 @@ class SaveData:
|
||||
'Property `%s` is not found in the instance of `SaveData` class.' % key)
|
||||
|
||||
if md5(value) == checksum:
|
||||
if key == 'installid_data' or key == 'devicemodelname_data' or key == 'finalestate_data':
|
||||
if key in ('installid_data', 'devicemodelname_data', 'finalestate_data'):
|
||||
self.__dict__[key] = json.loads(value)['val']
|
||||
else:
|
||||
self.__dict__[key] = json.loads(value)['']
|
||||
|
||||
@@ -14,6 +14,8 @@ from .world import WorldPlay
|
||||
|
||||
class Score:
|
||||
def __init__(self) -> None:
|
||||
self.c = None
|
||||
|
||||
self.song: 'Chart' = Chart()
|
||||
self.score: int = None
|
||||
self.shiny_perfect_count: int = None
|
||||
@@ -45,18 +47,17 @@ class Score:
|
||||
'''分数转换为评级'''
|
||||
if score >= 9900000: # EX+
|
||||
return 6
|
||||
elif 9800000 <= score < 9900000: # EX
|
||||
if score >= 9800000: # EX
|
||||
return 5
|
||||
elif 9500000 <= score < 9800000: # AA
|
||||
if score >= 9500000: # AA
|
||||
return 4
|
||||
elif 9200000 <= score < 9500000: # A
|
||||
if score >= 9200000: # A
|
||||
return 3
|
||||
elif 8900000 <= score < 9200000: # B
|
||||
if score >= 8900000: # B
|
||||
return 2
|
||||
elif 8600000 <= score < 8900000: # C
|
||||
if score >= 8600000: # C
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
return 0
|
||||
|
||||
@property
|
||||
def song_grade(self) -> int:
|
||||
@@ -67,16 +68,15 @@ class Score:
|
||||
'''clear_type转换为成绩状态,用数字大小标识便于比较'''
|
||||
if clear_type == 3: # PM
|
||||
return 5
|
||||
elif clear_type == 2: # FC
|
||||
if clear_type == 2: # FC
|
||||
return 4
|
||||
elif clear_type == 5: # Hard Clear
|
||||
if clear_type == 5: # Hard Clear
|
||||
return 3
|
||||
elif clear_type == 1: # Clear
|
||||
if clear_type == 1: # Clear
|
||||
return 2
|
||||
elif clear_type == 4: # Easy Clear
|
||||
if clear_type == 4: # Easy Clear
|
||||
return 1
|
||||
else: # Track Lost
|
||||
return 0
|
||||
return 0 # Track Lost
|
||||
|
||||
@property
|
||||
def song_state(self) -> int:
|
||||
@@ -112,8 +112,7 @@ class Score:
|
||||
ptt = defnum + 2
|
||||
elif score < 9800000:
|
||||
ptt = defnum + (score-9500000) / 300000
|
||||
if ptt < 0:
|
||||
ptt = 0
|
||||
ptt = max(ptt, 0)
|
||||
else:
|
||||
ptt = defnum + 1 + (score-9800000) / 200000
|
||||
|
||||
@@ -218,6 +217,7 @@ class UserPlay(UserScore):
|
||||
self.course_play: 'CoursePlay' = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
# 不能super
|
||||
if self.is_world_mode is None or self.course_play_state is None:
|
||||
return {}
|
||||
if self.course_play_state == 4:
|
||||
@@ -302,7 +302,7 @@ class UserPlay(UserScore):
|
||||
x = self.c.fetchone()
|
||||
if x:
|
||||
self.prog_boost_multiply = 300 if x[0] == 300 else 0
|
||||
if x[1] < self.beyond_boost_gauge_usage or (self.beyond_boost_gauge_usage != 100 and self.beyond_boost_gauge_usage != 200):
|
||||
if x[1] < self.beyond_boost_gauge_usage or self.beyond_boost_gauge_usage not in (100, 200):
|
||||
# 注意:偷懒了,没判断是否是beyond图
|
||||
self.beyond_boost_gauge_usage = 0
|
||||
|
||||
@@ -374,8 +374,8 @@ class UserPlay(UserScore):
|
||||
'''更新此分数对应用户的recent30'''
|
||||
old_recent_10 = self.ptt.recent_10
|
||||
if self.is_protected:
|
||||
old_r30 = [x for x in self.ptt.r30]
|
||||
old_s30 = [x for x in self.ptt.s30]
|
||||
old_r30 = self.ptt.r30.copy()
|
||||
old_s30 = self.ptt.s30.copy()
|
||||
|
||||
# 寻找pop位置
|
||||
songs = list(set(self.ptt.s30))
|
||||
@@ -479,7 +479,8 @@ class UserPlay(UserScore):
|
||||
|
||||
class Potential:
|
||||
'''
|
||||
用户潜力值计算处理类\
|
||||
用户潜力值计算处理类
|
||||
|
||||
property: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -487,8 +488,8 @@ class Potential:
|
||||
self.c = c
|
||||
self.user = user
|
||||
|
||||
self.r30: list = None
|
||||
self.s30: list = None
|
||||
self.r30: 'list[float]' = None
|
||||
self.s30: 'list[str]' = None
|
||||
self.songs_selected: list = None
|
||||
|
||||
self.b30: list = None
|
||||
@@ -503,7 +504,7 @@ class Potential:
|
||||
'''获取用户best30的总潜力值'''
|
||||
self.c.execute('''select rating from best_score where user_id = :a order by rating DESC limit 30''', {
|
||||
'a': self.user.user_id})
|
||||
return sum([x[0] for x in self.c.fetchall()])
|
||||
return sum(x[0] for x in self.c.fetchall())
|
||||
|
||||
def select_recent_30(self) -> None:
|
||||
'''获取用户recent30数据'''
|
||||
@@ -578,7 +579,8 @@ class Potential:
|
||||
|
||||
class UserScoreList:
|
||||
'''
|
||||
用户分数查询类\
|
||||
用户分数查询类
|
||||
|
||||
properties: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ class Chart:
|
||||
|
||||
def __init__(self, c=None, song_id: str = None, difficulty: int = None) -> None:
|
||||
self.c = c
|
||||
self.song_id: str = None
|
||||
self.difficulty: int = None
|
||||
self.set_chart(song_id, difficulty)
|
||||
self.defnum: int = None
|
||||
self.song_name: str = None
|
||||
|
||||
@@ -12,8 +12,8 @@ class Connect:
|
||||
|
||||
def __init__(self, file_path: str = Constant.SQLITE_DATABASE_PATH, in_memory: bool = False, logger=None) -> None:
|
||||
"""
|
||||
数据库连接,默认连接arcaea_database.db\
|
||||
接受:文件路径\
|
||||
数据库连接,默认连接arcaea_database.db
|
||||
接受:文件路径
|
||||
返回:sqlite3连接操作对象
|
||||
"""
|
||||
self.file_path = file_path
|
||||
@@ -21,6 +21,9 @@ class Connect:
|
||||
if logger is not None:
|
||||
self.logger = logger
|
||||
|
||||
self.conn: sqlite3.Connection = None
|
||||
self.c: sqlite3.Cursor = None
|
||||
|
||||
def __enter__(self) -> sqlite3.Cursor:
|
||||
if self.in_memory:
|
||||
self.conn = sqlite3.connect(
|
||||
@@ -144,19 +147,19 @@ class Query:
|
||||
raise InputError(api_error_code=-104)
|
||||
self.__sort = sort
|
||||
|
||||
def set_value(self, limit=-1, offset=0, query={}, fuzzy_query={}, sort=[]) -> None:
|
||||
def set_value(self, limit=-1, offset=0, query=None, fuzzy_query=None, sort=None) -> None:
|
||||
self.limit = limit
|
||||
self.offset = offset
|
||||
self.query = query
|
||||
self.fuzzy_query = fuzzy_query
|
||||
self.sort = sort
|
||||
self.query = query if query is not None else {}
|
||||
self.fuzzy_query = fuzzy_query if fuzzy_query is not None else {}
|
||||
self.sort = sort if sort is not None else []
|
||||
|
||||
def from_dict(self, d: dict) -> 'Query':
|
||||
self.set_value(d.get('limit', -1), d.get('offset', 0),
|
||||
d.get('query', {}), d.get('fuzzy_query', {}), d.get('sort', []))
|
||||
return self
|
||||
|
||||
def from_args(self, query: dict, limit: int = -1, offset: int = 0, sort: list = [], fuzzy_query: dict = {}) -> 'Query':
|
||||
def from_args(self, query: dict, limit: int = -1, offset: int = 0, sort: list = None, fuzzy_query: dict = None) -> 'Query':
|
||||
self.set_value(limit, offset, query, fuzzy_query, sort)
|
||||
return self
|
||||
|
||||
@@ -170,7 +173,7 @@ class Sql:
|
||||
self.c = c
|
||||
|
||||
@staticmethod
|
||||
def get_select_sql(table_name: str, target_column: list = [], query: 'Query' = None):
|
||||
def get_select_sql(table_name: str, target_column: list = None, query: 'Query' = None):
|
||||
'''拼接单表内行查询单句sql语句,返回语句和参数列表'''
|
||||
sql_list = []
|
||||
if not target_column:
|
||||
@@ -210,8 +213,10 @@ class Sql:
|
||||
return sql, sql_list
|
||||
|
||||
@staticmethod
|
||||
def get_insert_sql(table_name: str, key: list = [], value_len: int = None, insert_type: str = None) -> str:
|
||||
def get_insert_sql(table_name: str, key: list = None, value_len: int = None, insert_type: str = None) -> str:
|
||||
'''拼接insert语句,请注意只返回sql语句,insert_type为replace或ignore'''
|
||||
if key is None:
|
||||
key = []
|
||||
insert_type = 'replace' if insert_type in [
|
||||
'replace', 'R', 'r', 'REPLACE'] else 'ignore'
|
||||
return ('insert into ' if insert_type is None else 'insert or ' + insert_type + ' into ') + table_name + ('(' + ','.join(key) + ')' if key else '') + ' values(' + ','.join(['?'] * (len(key) if value_len is None else value_len)) + ')'
|
||||
@@ -281,13 +286,13 @@ class Sql:
|
||||
|
||||
return sql, sql_list
|
||||
|
||||
def select(self, table_name: str, target_column: list = [], query: 'Query' = None) -> list:
|
||||
def select(self, table_name: str, target_column: list = None, query: 'Query' = None) -> list:
|
||||
'''单表内行select单句sql语句,返回fetchall数据'''
|
||||
sql, sql_list = self.get_select_sql(table_name, target_column, query)
|
||||
self.c.execute(sql, sql_list)
|
||||
return self.c.fetchall()
|
||||
|
||||
def select_exists(self, table_name: str, target_column: list = [], query: 'Query' = None) -> bool:
|
||||
def select_exists(self, table_name: str, target_column: list = None, query: 'Query' = None) -> bool:
|
||||
'''单表内行select exists单句sql语句,返回bool值'''
|
||||
sql, sql_list = self.get_select_sql(table_name, target_column, query)
|
||||
self.c.execute('select exists(' + sql + ')', sql_list)
|
||||
@@ -329,7 +334,7 @@ class Sql:
|
||||
pk = []
|
||||
name = []
|
||||
|
||||
self.c.execute('''pragma table_info ("%s")''' % table_name) # 这里无法参数化
|
||||
self.c.execute(f'''pragma table_info ("{table_name}")''') # 这里无法参数化
|
||||
x = self.c.fetchall()
|
||||
if x:
|
||||
for i in x:
|
||||
@@ -390,8 +395,8 @@ class DatabaseMigrator:
|
||||
'''
|
||||
with Connect(self.c2_path) as c2:
|
||||
with Connect(self.c1_path) as c1:
|
||||
[self.update_one_table(c1, c2, i)
|
||||
for i in Constant.DATABASE_MIGRATE_TABLES]
|
||||
for i in Constant.DATABASE_MIGRATE_TABLES:
|
||||
self.update_one_table(c1, c2, i)
|
||||
|
||||
if not Constant.UPDATE_WITH_NEW_CHARACTER_DATA:
|
||||
self.update_one_table(c1, c2, 'character')
|
||||
@@ -406,7 +411,7 @@ class MemoryDatabase:
|
||||
self.c = self.conn.cursor()
|
||||
self.c.execute('''PRAGMA journal_mode = OFF''')
|
||||
self.c.execute('''PRAGMA synchronous = 0''')
|
||||
self.c.execute('''create table if not exists download_token(user_id int,
|
||||
self.c.execute('''create table if not exists download_token(user_id int,
|
||||
song_id text,file_name text,token text,time int,primary key(user_id, song_id, file_name));''')
|
||||
self.c.execute(
|
||||
'''create index if not exists download_token_1 on download_token (song_id, file_name);''')
|
||||
|
||||
@@ -13,7 +13,7 @@ class GameInfo:
|
||||
"stamina_recover_tick": Constant.STAMINA_RECOVER_TICK,
|
||||
"core_exp": Constant.CORE_EXP,
|
||||
"curr_ts": int(time()*1000),
|
||||
"level_steps": [{'level': i, 'level_exp': Constant.LEVEL_STEPS[i]} for i in Constant.LEVEL_STEPS],
|
||||
"level_steps": [{'level': k, 'level_exp': v} for k, v in Constant.LEVEL_STEPS.items()],
|
||||
"world_ranking_enabled": True,
|
||||
"is_byd_chapter_unlocked": True
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import base64
|
||||
import hashlib
|
||||
import time
|
||||
from os import urandom
|
||||
from random import randint
|
||||
|
||||
from .character import UserCharacter, UserCharacterList
|
||||
from .config_manager import Config
|
||||
@@ -100,7 +101,6 @@ class UserRegister(User):
|
||||
|
||||
def _build_user_code(self):
|
||||
# 生成9位的user_code,用的自然是随机
|
||||
from random import randint
|
||||
random_times = 0
|
||||
|
||||
while random_times <= 1000:
|
||||
@@ -148,7 +148,7 @@ class UserRegister(User):
|
||||
self._build_user_id()
|
||||
self._insert_user_char()
|
||||
|
||||
self.c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
|
||||
self.c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
|
||||
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled, current_map, ticket, prog_boost, email)
|
||||
values(:user_id, :name, :password, :join_date, :user_code, 0, 0, 0, 0, 0, 0, -1, 0, '', :memories, 0, :email)
|
||||
''', {'user_code': self.user_code, 'user_id': self.user_id, 'join_date': now, 'name': self.name, 'password': self.hash_pwd, 'memories': Config.DEFAULT_MEMORIES, 'email': self.email})
|
||||
@@ -309,6 +309,8 @@ class UserInfo(User):
|
||||
self.beyond_boost_gauge: float = 0
|
||||
self.next_fragstam_ts: int = None
|
||||
self.world_mode_locked_end_ts: int = None
|
||||
self.current_map: 'Map' = None
|
||||
self.stamina: 'UserStamina' = None
|
||||
|
||||
self.__cores: list = None
|
||||
self.__packs: list = None
|
||||
@@ -406,7 +408,7 @@ class UserInfo(User):
|
||||
self.c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
|
||||
{'x': i[0], 'y': self.user_id})
|
||||
|
||||
is_mutual = True if self.c.fetchone() == (1,) else False
|
||||
is_mutual = self.c.fetchone() == (1,)
|
||||
|
||||
you = UserOnline(self.c, i[0])
|
||||
you.select_user()
|
||||
@@ -626,7 +628,7 @@ class UserInfo(User):
|
||||
self.character.is_uncapped_override = x[5] == 1
|
||||
self.current_map = UserMap(self.c, x[6], self)
|
||||
self.world_mode_locked_end_ts = x[7] if x[7] else -1
|
||||
self.beyond_boost_gauge = x[8] if x[8] else 0
|
||||
self.beyond_boost_gauge = x[8] if x[8] else 0
|
||||
|
||||
@property
|
||||
def global_rank(self) -> int:
|
||||
@@ -660,16 +662,16 @@ class UserInfo(User):
|
||||
|
||||
score_sum = 0
|
||||
if len(song_list_ftr) >= 2:
|
||||
self.c.execute('''select sum(score) from best_score where user_id=? and difficulty=2 and song_id in ({0})'''.format(
|
||||
','.join(['?']*(len(song_list_ftr)-1))), tuple(song_list_ftr))
|
||||
self.c.execute(
|
||||
f'''select sum(score) from best_score where user_id=? and difficulty=2 and song_id in ({','.join(['?']*(len(song_list_ftr)-1))})''', tuple(song_list_ftr))
|
||||
|
||||
x = self.c.fetchone()
|
||||
if x[0] is not None:
|
||||
score_sum += x[0]
|
||||
|
||||
if len(song_list_byn) >= 2:
|
||||
self.c.execute('''select sum(score) from best_score where user_id=? and difficulty=3 and song_id in ({0})'''.format(
|
||||
','.join(['?']*(len(song_list_byn)-1))), tuple(song_list_byn))
|
||||
self.c.execute(
|
||||
f'''select sum(score) from best_score where user_id=? and difficulty=3 and song_id in ({','.join(['?']*(len(song_list_byn)-1))})''', tuple(song_list_byn))
|
||||
|
||||
x = self.c.fetchone()
|
||||
if x[0] is not None:
|
||||
@@ -682,13 +684,13 @@ class UserInfo(User):
|
||||
|
||||
def select_user_one_column(self, column_name: str, default_value=None) -> None:
|
||||
'''
|
||||
查询user表的某个属性\
|
||||
查询user表的某个属性
|
||||
请注意必须是一个普通属性,不能是一个类的实例
|
||||
'''
|
||||
if column_name not in self.__dict__:
|
||||
raise InputError('No such column.')
|
||||
self.c.execute('''select %s from user where user_id = :a''' %
|
||||
column_name, {'a': self.user_id})
|
||||
self.c.execute(f'''select {column_name} from user where user_id = :a''', {
|
||||
'a': self.user_id})
|
||||
x = self.c.fetchone()
|
||||
if not x:
|
||||
raise NoData('No user.', 108, -3)
|
||||
@@ -697,15 +699,15 @@ class UserInfo(User):
|
||||
|
||||
def update_user_one_column(self, column_name: str, value=None) -> None:
|
||||
'''
|
||||
更新user表的某个属性\
|
||||
更新user表的某个属性
|
||||
请注意必须是一个普通属性,不能是一个类的实例
|
||||
'''
|
||||
if column_name not in self.__dict__:
|
||||
raise InputError('No such column.')
|
||||
if value is not None:
|
||||
self.__dict__[column_name] = value
|
||||
self.c.execute('''update user set %s = :a where user_id = :b''' %
|
||||
column_name, {'a': self.__dict__[column_name], 'b': self.user_id})
|
||||
self.c.execute(f'''update user set {column_name} = :a where user_id = :b''', {
|
||||
'a': self.__dict__[column_name], 'b': self.user_id})
|
||||
|
||||
|
||||
class UserOnline(UserInfo):
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from json import load
|
||||
from random import random
|
||||
from time import time
|
||||
|
||||
@@ -25,15 +25,15 @@ def get_world_name(file_dir: str = Constant.WORLD_MAP_FOLDER_PATH) -> list:
|
||||
def get_world_info(map_id: str) -> dict:
|
||||
'''读取json文件内容,返回字典'''
|
||||
world_info = {}
|
||||
with open(os.path.join(Constant.WORLD_MAP_FOLDER_PATH, map_id+'.json'), 'r') as f:
|
||||
world_info = json.load(f)
|
||||
with open(os.path.join(Constant.WORLD_MAP_FOLDER_PATH, f'{map_id}.json'), 'rb') as f:
|
||||
world_info = load(f)
|
||||
|
||||
return world_info
|
||||
|
||||
|
||||
def get_world_all(c, user) -> list:
|
||||
'''
|
||||
读取所有地图信息,返回列表\
|
||||
读取所有地图信息,返回列表
|
||||
parameter: `user` - `User`类或子类的实例
|
||||
'''
|
||||
worlds = get_world_name()
|
||||
@@ -44,7 +44,7 @@ class Step:
|
||||
'''台阶类'''
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.postion: int = None
|
||||
self.position: int = None
|
||||
self.capture: int = None
|
||||
self.items: list = []
|
||||
self.restrict_id: str = None
|
||||
@@ -198,7 +198,7 @@ class Map:
|
||||
|
||||
class UserMap(Map):
|
||||
'''
|
||||
用户地图类\
|
||||
用户地图类
|
||||
parameters: `user` - `User`类或者子类的实例
|
||||
'''
|
||||
|
||||
@@ -413,7 +413,8 @@ class Stamina:
|
||||
|
||||
class UserStamina(Stamina):
|
||||
'''
|
||||
用户体力类\
|
||||
用户体力类
|
||||
|
||||
parameter: `user` - `User`类或子类的实例
|
||||
'''
|
||||
|
||||
@@ -439,8 +440,9 @@ class UserStamina(Stamina):
|
||||
|
||||
class WorldPlay:
|
||||
'''
|
||||
世界模式打歌类,处理特殊角色技能,联动UserMap和UserPlay\
|
||||
parameter: `user` - `UserOnline`类或子类的实例\
|
||||
世界模式打歌类,处理特殊角色技能,联动UserMap和UserPlay
|
||||
|
||||
parameter: `user` - `UserOnline`类或子类的实例
|
||||
'user_play` - `UserPlay`类的实例
|
||||
'''
|
||||
|
||||
@@ -598,8 +600,8 @@ class WorldPlay:
|
||||
if self.user_play.beyond_gauge == 0:
|
||||
# 更新byd大招蓄力条
|
||||
self.user.beyond_boost_gauge += self.beyond_boost_gauge_addition
|
||||
if self.user.beyond_boost_gauge > 200:
|
||||
self.user.beyond_boost_gauge = 200
|
||||
self.user.beyond_boost_gauge = min(
|
||||
self.user.beyond_boost_gauge, 200)
|
||||
self.user.update_user_one_column(
|
||||
'beyond_boost_gauge', self.user.beyond_boost_gauge)
|
||||
elif self.user_play.beyond_boost_gauge_usage != 0 and self.user_play.beyond_boost_gauge_usage <= self.user.beyond_boost_gauge:
|
||||
@@ -684,7 +686,7 @@ class WorldPlay:
|
||||
|
||||
def _skill_vita(self) -> None:
|
||||
'''
|
||||
vita技能,overdrive随回忆率提升,提升量最多为10\
|
||||
vita技能,overdrive随回忆率提升,提升量最多为10
|
||||
此处采用线性函数
|
||||
'''
|
||||
self.over_skill_increase = 0
|
||||
@@ -745,7 +747,7 @@ class WorldPlay:
|
||||
'''
|
||||
x: 'Step' = self.user.current_map.steps_for_climbing[0]
|
||||
if ('randomsong' in x.step_type or 'speedlimit' in x.step_type) and self.user_play.song_grade < 5:
|
||||
self.character_bonus_progress = -self.step_value / 2 / self.step_times
|
||||
self.character_bonus_progress = -1 * self.step_value / 2 / self.step_times
|
||||
self.step_value = self.step_value / 2
|
||||
self.user.current_map.reclimb(self.step_value)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user