Code refactoring

- Code refactoring
- Fix a bug that the other player will not become the host of the room at once, when the player disconnect in link play.

> Maybe add many unknown bugs. XD
> The song database `arcsong.db` will not used in the future. You can use a tool in `tool` folder to import old data.
This commit is contained in:
Lost-MSth
2022-07-04 18:36:30 +08:00
parent 9de62d3645
commit 6fcca17918
49 changed files with 3663 additions and 3660 deletions

View File

@@ -1,3 +1,663 @@
import json
import os
from functools import lru_cache
from random import random
from time import time
from .character import Character
from .constant import Constant
from .error import InputError, MapLocked, NoData
from .item import ItemFactory
@lru_cache(maxsize=128)
def get_world_name(file_dir: str = Constant.WORLD_MAP_FOLDER_PATH) -> list:
'''获取所有地图名称,返回列表'''
file_list = []
for root, dirs, files in os.walk(file_dir):
for file in files:
if os.path.splitext(file)[1] == '.json':
file_list.append(os.path.splitext(file)[0])
return file_list
@lru_cache(maxsize=128)
def get_world_info(map_id: str) -> dict:
'''读取json文件内容返回字典'''
world_info = {}
with open(os.path.join(Constant.WORLD_MAP_FOLDER_PATH, map_id+'.json'), 'r') as f:
world_info = json.load(f)
return world_info
def get_world_all(c, user) -> list:
'''
读取所有地图信息,返回列表\
parameter: `user` - `User`类或子类的实例
'''
worlds = get_world_name()
return [UserMap(c, map_id, user) for map_id in worlds]
class Step:
'''台阶类'''
def __init__(self) -> None:
self.postion: int = None
self.capture: int = None
self.items: list = []
self.restrict_id: str = None
self.restrict_ids: list = []
self.restrict_type: str = None
self.step_type: list = None
self.speed_limit_value: int = None
self.plus_stamina_value: int = None
@property
def to_dict(self) -> dict:
r = {
'position': self.position,
'capture': self.capture,
}
if self.items:
r['items'] = [i.to_dict() for i in self.items]
if self.restrict_id:
r['restrict_id'] = self.restrict_id
if self.restrict_ids:
r['restrict_ids'] = self.restrict_ids
if self.restrict_type:
r['restrict_type'] = self.restrict_type
if self.step_type:
r['step_type'] = self.step_type
if self.speed_limit_value:
r['speed_limit_value'] = self.speed_limit_value
if self.plus_stamina_value:
r['plus_stamina_value'] = self.plus_stamina_value
return r
def from_dict(self, d: dict) -> 'Step':
self.position = d['position']
self.capture = d['capture']
self.restrict_id = d.get('restrict_id')
self.restrict_ids = d.get('restrict_ids')
self.restrict_type = d.get('restrict_type')
self.step_type = d.get('step_type')
self.speed_limit_value = d.get('speed_limit_value')
self.plus_stamina_value = d.get('plus_stamina_value')
if 'items' in d:
self.items = [ItemFactory.from_dict(i) for i in d['items']]
return self
class Map:
def __init__(self, map_id: str = None) -> None:
self.map_id = map_id
self.map_id: str = map_id
self.is_legacy: bool = None
self.is_beyond: bool = None
self.beyond_health: int = None
self.character_affinity: list = []
self.affinity_multiplier: list = []
self.chapter: int = None
self.available_from: int = None
self.available_to: int = None
self.is_repeatable: bool = None
self.require_id: str = None
self.require_type: str = None
self.require_value: int = None
self.coordinate: str = None
self.custom_bg: str = None
self.stamina_cost: int = None
self.steps: list = []
self.__rewards: list = None
@property
def rewards(self) -> list:
if self.__rewards is None:
self.get_rewards()
return self.__rewards
def get_rewards(self) -> list:
if self.steps:
self.__rewards = []
for step in self.steps:
if step.items:
self.__rewards.append(
{'items': [i.to_dict() for i in step.items], 'position': step.position})
return self.__rewards
@property
def step_count(self):
return len(self.steps)
def to_dict(self) -> dict:
if self.chapter is None:
self.select_map_info()
return {
'map_id': self.map_id,
'is_legacy': self.is_legacy,
'is_beyond': self.is_beyond,
'beyond_health': self.beyond_health,
'character_affinity': self.character_affinity,
'affinity_multiplier': self.affinity_multiplier,
'chapter': self.chapter,
'available_from': self.available_from,
'available_to': self.available_to,
'is_repeatable': self.is_repeatable,
'require_id': self.require_id,
'require_type': self.require_type,
'require_value': self.require_value,
'coordinate': self.coordinate,
'custom_bg': self.custom_bg,
'stamina_cost': self.stamina_cost,
'step_count': self.step_count,
'steps': [s.to_dict for s in self.steps],
}
def from_dict(self, raw_dict: dict) -> 'Map':
self.is_legacy = raw_dict.get('is_legacy')
self.is_beyond = raw_dict.get('is_beyond')
self.beyond_health = raw_dict.get('beyond_health')
self.character_affinity = raw_dict.get('character_affinity')
self.affinity_multiplier = raw_dict.get('affinity_multiplier')
self.chapter = raw_dict.get('chapter')
self.available_from = raw_dict.get('available_from')
self.available_to = raw_dict.get('available_to')
self.is_repeatable = raw_dict.get('is_repeatable')
self.require_id = raw_dict.get('require_id')
self.require_type = raw_dict.get('require_type')
self.require_value = raw_dict.get('require_value')
self.coordinate = raw_dict.get('coordinate')
self.custom_bg = raw_dict.get('custom_bg')
self.stamina_cost = raw_dict.get('stamina_cost')
self.steps = [Step().from_dict(s) for s in raw_dict.get('steps')]
return self
def select_map_info(self):
'''获取地图信息'''
self.from_dict(get_world_info(self.map_id))
class UserMap(Map):
'''
用户地图类\
parameters: `user` - `User`类或者子类的实例
'''
def __init__(self, c=None, map_id: str = None, user=None) -> None:
super().__init__(map_id)
self.c = c
self.curr_position: int = None
self.curr_capture: int = None
self.is_locked: bool = None
self.prev_position: int = None
self.prev_capture: int = None
self.user = user
@property
def rewards_for_climbing(self) -> list:
rewards = []
for i in range(self.prev_position, self.curr_position+1):
step = self.steps[i]
if step.items:
rewards.append(
{'items': step.items, 'position': step.position})
return rewards
@property
def rewards_for_climbing_to_dict(self) -> list:
rewards = []
for i in range(self.prev_position, self.curr_position+1):
step = self.steps[i]
if step.items:
rewards.append(
{'items': [i.to_dict() for i in step.items], 'position': step.position})
return rewards
@property
def steps_for_climbing(self) -> list:
return self.steps[self.prev_position:self.curr_position+1]
def to_dict(self, has_map_info: bool = False, has_steps: bool = False, has_rewards: bool = False) -> dict:
if self.is_locked is None:
self.select()
if has_map_info:
if self.chapter is None:
self.select_map_info()
r = super().to_dict()
r['curr_position'] = self.curr_position
r['curr_capture'] = self.curr_capture
r['is_locked'] = self.is_locked
r['user_id'] = self.user.user_id
if not has_steps:
del r['steps']
if has_rewards:
r['rewards'] = self.rewards
else:
r = {
'map_id': self.map_id,
'curr_position': self.curr_position,
'curr_capture': self.curr_capture,
'is_locked': self.is_locked,
'user_id': self.user.user_id,
}
return r
def initialize(self):
'''初始化数据库信息'''
self.c.execute('''insert into user_world values(:a,:b,0,0,1)''', {
'a': self.user.user_id, 'b': self.map_id})
def update(self):
'''向数据库更新信息'''
self.c.execute('''update user_world set curr_position=:a,curr_capture=:b,is_locked=:c where user_id=:d and map_id=:e''', {
'a': self.curr_position, 'b': self.curr_capture, 'c': 1 if self.is_locked else 0, 'd': self.user.user_id, 'e': self.map_id})
def select(self):
'''获取用户在此地图的信息'''
self.c.execute('''select curr_position, curr_capture, is_locked from user_world where map_id = :a and user_id = :b''',
{'a': self.map_id, 'b': self.user.user_id})
x = self.c.fetchone()
if x:
self.curr_position = x[0]
self.curr_capture = x[1]
self.is_locked = x[2] == 1
else:
self.curr_position = 0
self.curr_capture = 0
self.is_locked = True
self.initialize()
def change_user_current_map(self):
'''改变用户当前地图为此地图'''
self.user.current_map = self
self.c.execute('''update user set current_map = :a where user_id=:b''', {
'a': self.map_id, 'b': self.user.user_id})
def unlock(self) -> bool:
'''解锁用户此地图返回成功与否bool值'''
self.select()
if self.is_locked:
self.is_locked = False
self.curr_position = 0
self.curr_capture = 0
self.select_map_info()
if self.require_type is not None and self.require_type != '':
if self.require_type in ['pack', 'single']:
item = ItemFactory(self.c).get_item(self.require_type)
item.item_id = self.require_id
item.select(self.user)
if not item.amount:
self.is_locked = True
self.update()
return not self.is_locked
def climb(self, step_value: float) -> None:
'''爬梯子,数值非负'''
if step_value < 0:
raise InputError('`Step_value` must be non-negative.')
if self.curr_position is None:
self.select()
if self.is_beyond is None:
self.select_map_info()
if self.is_locked:
raise MapLocked('The map is locked.')
self.prev_capture = self.curr_capture
self.prev_position = self.curr_position
if self.is_beyond: # beyond判断
dt = self.beyond_health - self.prev_capture
self.curr_capture = self.prev_capture + \
step_value if dt >= step_value else self.beyond_health
i = 0
t = self.prev_capture + step_value
while i < self.step_count and t > 0:
dt = self.steps[i].capture
if dt > t:
t = 0
else:
t -= dt
i += 1
if i >= self.step_count:
self.curr_position = self.step_count - 1
else:
self.curr_position = i
else:
i = self.prev_position
j = self.prev_capture
t = step_value
while t > 0 and i < self.step_count:
dt = self.steps[i].capture - j
if dt > t:
j += t
t = 0
else:
t -= dt
j = 0
i += 1
if i >= self.step_count:
self.curr_position = self.step_count - 1
self.curr_capture = 0
else:
self.curr_position = i
self.curr_capture = j
def reclimb(self, step_value: float) -> None:
'''重新爬梯子计算'''
self.curr_position = self.prev_position
self.curr_capture = self.prev_capture
self.climb(step_value)
class Stamina:
'''
体力类
'''
def __init__(self) -> None:
self.__stamina: int = None
self.max_stamina_ts: int = None
def set_value(self, max_stamina_ts: int, stamina: int):
self.max_stamina_ts = int(max_stamina_ts) if max_stamina_ts else 0
self.__stamina = int(stamina) if stamina else Constant.MAX_STAMINA
@property
def stamina(self) -> int:
'''通过计算得到当前的正确体力值'''
stamina = int(Constant.MAX_STAMINA - (self.max_stamina_ts -
int(time()*1000)) / Constant.STAMINA_RECOVER_TICK)
if stamina >= Constant.MAX_STAMINA:
if self.__stamina >= Constant.MAX_STAMINA:
stamina = self.__stamina
else:
stamina = Constant.MAX_STAMINA
return stamina if stamina > 0 else 0
@stamina.setter
def stamina(self, value: int) -> None:
'''设置体力值此处会导致max_stamina_ts变化'''
self.__stamina = int(value)
self.max_stamina_ts = int(
time()*1000) - (self.__stamina-Constant.MAX_STAMINA) * Constant.STAMINA_RECOVER_TICK
class UserStamina(Stamina):
'''
用户体力类\
parameter: `user` - `User`类或子类的实例
'''
def __init__(self, c=None, user=None) -> None:
super().__init__()
self.c = c
self.user = user
def select(self):
'''获取用户体力信息'''
self.c.execute('''select max_stamina_ts, staminafrom user where user_id = :a''',
{'a': self.user.user_id})
x = self.c.fetchone()
if not x:
raise NoData('The user does not exist.')
self.set_value(x[0], x[1])
def update(self):
'''向数据库更新信息'''
self.c.execute('''update user set max_stamina_ts=:b, stamina=:a where user_id=:c''', {
'a': self.stamina, 'b': self.max_stamina_ts, 'c': self.user.user_id})
class WorldPlay:
'''
世界模式打歌类处理特殊角色技能联动UserMap和UserPlay\
parameter: `user` - `UserOnline`类或子类的实例\
'user_play` - `UserPlay`类的实例
'''
def __init__(self, c=None, user=None, user_play=None) -> None:
self.c = c
self.user = user
self.user_play = user_play
self.character_used = None
self.base_step_value: float = None
self.step_value: float = None
self.prog_tempest: float = None
self.overdrive_extra: float = None
self.character_bonus_progress: float = None
@property
def to_dict(self) -> dict:
arcmap: 'UserMap' = self.user.current_map
r = {
"rewards": arcmap.rewards_for_climbing_to_dict,
"exp": self.character_used.level.exp,
"level": self.character_used.level.level,
"base_progress": self.base_step_value,
"progress": self.step_value,
"user_map": {
"user_id": self.user.user_id,
"curr_position": arcmap.curr_position,
"curr_capture": arcmap.curr_capture,
"is_locked": arcmap.is_locked,
"map_id": arcmap.map_id,
"prev_capture": arcmap.prev_capture,
"prev_position": arcmap.prev_position,
"beyond_health": arcmap.beyond_health
},
"char_stats": {
"character_id": self.character_used.character_id,
"frag": self.character_used.frag.get_value(self.character_used.level),
"prog": self.character_used.prog.get_value(self.character_used.level),
"overdrive": self.character_used.overdrive.get_value(self.character_used.level)
},
"current_stamina": self.user.stamina.stamina,
"max_stamina_ts": self.user.stamina.max_stamina_ts
}
if self.overdrive_extra is not None:
r['char_stats']['overdrive'] += self.overdrive_extra
if self.prog_tempest is not None:
r['char_stats']['prog'] += self.prog_tempest
r['char_stats']['prog_tempest'] = self.prog_tempest
if self.character_bonus_progress is not None:
# 猜的,为了让客户端正确显示,当然结果是没问题的
r['base_progress'] += self.character_bonus_progress
r['character_bonus_progress'] = self.character_bonus_progress
if self.user_play.beyond_gauge == 0:
r["user_map"]["steps"] = [
x.to_dict for x in arcmap.steps_for_climbing]
else:
r["user_map"]["steps"] = len(arcmap.steps_for_climbing)
if self.user_play.stamina_multiply != 1:
r['stamina_multiply'] = self.user_play.stamina_multiply
if self.user_play.fragment_multiply != 100:
r['fragment_multiply'] = self.user_play.fragment_multiply
if self.user_play.prog_boost_multiply != 0:
r['prog_boost_multiply'] = self.user_play.prog_boost_multiply
return r
@property
def step_times(self) -> float:
return self.user_play.stamina_multiply * self.user_play.fragment_multiply / 100 * (self.user_play.prog_boost_multiply+100) / 100
@property
def exp_times(self) -> float:
return self.user_play.stamina_multiply * (self.user_play.prog_boost_multiply+100) / 100
def get_step(self) -> None:
if self.user_play.beyond_gauge == 0:
self.base_step_value = 2.5 + 2.45 * self.user_play.rating**0.5
prog = self.character_used.prog.get_value(
self.character_used.level)
if self.prog_tempest:
prog += self.prog_tempest
self.step_value = self.base_step_value * prog / 50 * self.step_times
else:
if self.user_play.clear_type == 0:
self.base_step_value = 25/28 + \
(self.user_play.rating)**0.5 * 0.43
else:
self.base_step_value = 75/28 + \
(self.user_play.rating)**0.5 * 0.43
if self.character_used.character_id in self.user.current_map.character_affinity:
affinity_multiplier = self.user.current_map.affinity_multiplier[self.user.current_map.character_affinity.index(
self.character_used.character_id)]
else:
affinity_multiplier = 1
overdrive = self.character_used.overdrive.get_value(
self.character_used.level)
if self.overdrive_extra:
overdrive += self.overdrive_extra
self.step_value = self.base_step_value * overdrive / \
50 * self.step_times * affinity_multiplier
def update(self) -> None:
'''世界模式更新'''
if self.user_play.prog_boost_multiply != 0:
self.user.update_prog_boost(0)
self.user_play.clear_play_state()
self.user.select_user_about_world_play()
self.character_used = Character()
self.user.character.select_character_info()
if not self.user.is_skill_sealed:
self.character_used = self.user.character
else:
self.character_used.character_id = self.user.character.character_id
self.character_used.level.level = self.user.character.level.level
self.character_used.level.exp = self.user.character.level.exp
self.character_used.frag.set_parameter(50, 50, 50)
self.character_used.prog.set_parameter(50, 50, 50)
self.character_used.overdrive.set_parameter(50, 50, 50)
self.user.current_map.select_map_info()
self.before_calculate()
self.get_step()
self.user.current_map.climb(self.step_value)
self.after_climb()
for i in self.user.current_map.rewards_for_climbing: # 物品分发
for j in i['items']:
j.c = self.c
j.user_claim_item(self.user)
x: 'Step' = self.user.current_map.steps_for_climbing[-1]
if x.step_type:
if 'plusstamina' in x.step_type and x.plus_stamina_value:
# 体力格子
self.user.stamina.stamina += x.plus_stamina_value
self.user.stamina.update()
# 角色升级
if self.character_used.database_table_name == 'user_char':
self.character_used.upgrade(
self.user, self.exp_times*self.user_play.rating*6)
if self.user.current_map.curr_position == self.user.current_map.step_count-1 and self.user.current_map.is_repeatable: # 循环图判断
self.user.current_map.curr_position = 0
self.user.current_map.update()
def before_calculate(self) -> None:
if self.user_play.beyond_gauge == 0:
if self.character_used.character_id == 35 and self.character_used.skill_id_displayed:
self._special_tempest()
else:
if self.character_used.skill_id_displayed == 'skill_vita':
self._skill_vita()
def after_climb(self) -> None:
factory_dict = {'eto_uncap': self._eto_uncap,
'ayu_uncap': self._ayu_uncap, 'luna_uncap': self._luna_uncap}
if self.character_used.skill_id_displayed in factory_dict:
factory_dict[self.character_used.skill_id_displayed]()
def _special_tempest(self) -> None:
'''风暴对立技能prog随全角色等级提升'''
if self.character_used.database_table_name == 'user_char_full':
self.prog_tempest = 60
else:
self.c.execute(
'''select sum(level) from user_char where user_id=?''', (self.user.user_id,))
x = self.c.fetchone()
self.prog_tempest = int(x[0]) / 10 if x else 0
if self.prog_tempest > 60:
self.prog_tempest = 60
elif self.prog_tempest < 0:
self.prog_tempest = 0
def _skill_vita(self) -> None:
'''
vita技能overdrive随回忆率提升提升量最多为10\
此处采用线性函数
'''
self.overdrive_extra = 0
if 0 < self.user_play.health <= 100:
self.overdrive_extra = self.user_play.health / 10
def _eto_uncap(self) -> None:
'''eto觉醒技能获得残片奖励时世界模式进度加7'''
fragment_flag = False
for i in self.user.current_map.rewards_for_climbing:
for j in i['items']:
if j.item_type == 'fragment':
fragment_flag = True
break
if fragment_flag:
break
if fragment_flag:
self.character_bonus_progress = Constant.ETO_UNCAP_BONUS_PROGRESS
self.step_value += self.character_bonus_progress * self.step_times
self.user.current_map.reclimb(self.step_value)
def _luna_uncap(self) -> None:
'''luna觉醒技能限制格开始时世界模式进度加7'''
x: 'Step' = self.user.current_map.steps_for_climbing[0]
if x.restrict_id and x.restrict_type:
self.character_bonus_progress = Constant.LUNA_UNCAP_BONUS_PROGRESS
self.step_value += self.character_bonus_progress * self.step_times
self.user.current_map.reclimb(self.step_value)
def _ayu_uncap(self) -> None:
'''ayu觉醒技能世界模式进度+5或-5但不会小于0'''
self.character_bonus_progress = Constant.AYU_UNCAP_BONUS_PROGRESS if random(
) >= 0.5 else -Constant.AYU_UNCAP_BONUS_PROGRESS
self.step_value += self.character_bonus_progress * self.step_times
if self.step_value < 0:
self.character_bonus_progress += self.step_value / self.step_times
self.step_value = 0
self.user.current_map.reclimb(self.step_value)