Files
Arcaea-server/latest version/core/world.py
Lost-MSth 6fcca17918 Code refactoring
- Code refactoring
- Fix a bug that the other player will not become the host of the room at once, when the player disconnect in link play.

> Maybe add many unknown bugs. XD
> The song database `arcsong.db` will not used in the future. You can use a tool in `tool` folder to import old data.
2022-07-04 18:36:30 +08:00

664 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
from functools import lru_cache
from random import random
from time import time
from .character import Character
from .constant import Constant
from .error import InputError, MapLocked, NoData
from .item import ItemFactory
@lru_cache(maxsize=128)
def get_world_name(file_dir: str = Constant.WORLD_MAP_FOLDER_PATH) -> list:
'''获取所有地图名称,返回列表'''
file_list = []
for root, dirs, files in os.walk(file_dir):
for file in files:
if os.path.splitext(file)[1] == '.json':
file_list.append(os.path.splitext(file)[0])
return file_list
@lru_cache(maxsize=128)
def get_world_info(map_id: str) -> dict:
'''读取json文件内容返回字典'''
world_info = {}
with open(os.path.join(Constant.WORLD_MAP_FOLDER_PATH, map_id+'.json'), 'r') as f:
world_info = json.load(f)
return world_info
def get_world_all(c, user) -> list:
'''
读取所有地图信息,返回列表\
parameter: `user` - `User`类或子类的实例
'''
worlds = get_world_name()
return [UserMap(c, map_id, user) for map_id in worlds]
class Step:
'''台阶类'''
def __init__(self) -> None:
self.postion: int = None
self.capture: int = None
self.items: list = []
self.restrict_id: str = None
self.restrict_ids: list = []
self.restrict_type: str = None
self.step_type: list = None
self.speed_limit_value: int = None
self.plus_stamina_value: int = None
@property
def to_dict(self) -> dict:
r = {
'position': self.position,
'capture': self.capture,
}
if self.items:
r['items'] = [i.to_dict() for i in self.items]
if self.restrict_id:
r['restrict_id'] = self.restrict_id
if self.restrict_ids:
r['restrict_ids'] = self.restrict_ids
if self.restrict_type:
r['restrict_type'] = self.restrict_type
if self.step_type:
r['step_type'] = self.step_type
if self.speed_limit_value:
r['speed_limit_value'] = self.speed_limit_value
if self.plus_stamina_value:
r['plus_stamina_value'] = self.plus_stamina_value
return r
def from_dict(self, d: dict) -> 'Step':
self.position = d['position']
self.capture = d['capture']
self.restrict_id = d.get('restrict_id')
self.restrict_ids = d.get('restrict_ids')
self.restrict_type = d.get('restrict_type')
self.step_type = d.get('step_type')
self.speed_limit_value = d.get('speed_limit_value')
self.plus_stamina_value = d.get('plus_stamina_value')
if 'items' in d:
self.items = [ItemFactory.from_dict(i) for i in d['items']]
return self
class Map:
def __init__(self, map_id: str = None) -> None:
self.map_id: str = map_id
self.is_legacy: bool = None
self.is_beyond: bool = None
self.beyond_health: int = None
self.character_affinity: list = []
self.affinity_multiplier: list = []
self.chapter: int = None
self.available_from: int = None
self.available_to: int = None
self.is_repeatable: bool = None
self.require_id: str = None
self.require_type: str = None
self.require_value: int = None
self.coordinate: str = None
self.custom_bg: str = None
self.stamina_cost: int = None
self.steps: list = []
self.__rewards: list = None
@property
def rewards(self) -> list:
if self.__rewards is None:
self.get_rewards()
return self.__rewards
def get_rewards(self) -> list:
if self.steps:
self.__rewards = []
for step in self.steps:
if step.items:
self.__rewards.append(
{'items': [i.to_dict() for i in step.items], 'position': step.position})
return self.__rewards
@property
def step_count(self):
return len(self.steps)
def to_dict(self) -> dict:
if self.chapter is None:
self.select_map_info()
return {
'map_id': self.map_id,
'is_legacy': self.is_legacy,
'is_beyond': self.is_beyond,
'beyond_health': self.beyond_health,
'character_affinity': self.character_affinity,
'affinity_multiplier': self.affinity_multiplier,
'chapter': self.chapter,
'available_from': self.available_from,
'available_to': self.available_to,
'is_repeatable': self.is_repeatable,
'require_id': self.require_id,
'require_type': self.require_type,
'require_value': self.require_value,
'coordinate': self.coordinate,
'custom_bg': self.custom_bg,
'stamina_cost': self.stamina_cost,
'step_count': self.step_count,
'steps': [s.to_dict for s in self.steps],
}
def from_dict(self, raw_dict: dict) -> 'Map':
self.is_legacy = raw_dict.get('is_legacy')
self.is_beyond = raw_dict.get('is_beyond')
self.beyond_health = raw_dict.get('beyond_health')
self.character_affinity = raw_dict.get('character_affinity')
self.affinity_multiplier = raw_dict.get('affinity_multiplier')
self.chapter = raw_dict.get('chapter')
self.available_from = raw_dict.get('available_from')
self.available_to = raw_dict.get('available_to')
self.is_repeatable = raw_dict.get('is_repeatable')
self.require_id = raw_dict.get('require_id')
self.require_type = raw_dict.get('require_type')
self.require_value = raw_dict.get('require_value')
self.coordinate = raw_dict.get('coordinate')
self.custom_bg = raw_dict.get('custom_bg')
self.stamina_cost = raw_dict.get('stamina_cost')
self.steps = [Step().from_dict(s) for s in raw_dict.get('steps')]
return self
def select_map_info(self):
'''获取地图信息'''
self.from_dict(get_world_info(self.map_id))
class UserMap(Map):
'''
用户地图类\
parameters: `user` - `User`类或者子类的实例
'''
def __init__(self, c=None, map_id: str = None, user=None) -> None:
super().__init__(map_id)
self.c = c
self.curr_position: int = None
self.curr_capture: int = None
self.is_locked: bool = None
self.prev_position: int = None
self.prev_capture: int = None
self.user = user
@property
def rewards_for_climbing(self) -> list:
rewards = []
for i in range(self.prev_position, self.curr_position+1):
step = self.steps[i]
if step.items:
rewards.append(
{'items': step.items, 'position': step.position})
return rewards
@property
def rewards_for_climbing_to_dict(self) -> list:
rewards = []
for i in range(self.prev_position, self.curr_position+1):
step = self.steps[i]
if step.items:
rewards.append(
{'items': [i.to_dict() for i in step.items], 'position': step.position})
return rewards
@property
def steps_for_climbing(self) -> list:
return self.steps[self.prev_position:self.curr_position+1]
def to_dict(self, has_map_info: bool = False, has_steps: bool = False, has_rewards: bool = False) -> dict:
if self.is_locked is None:
self.select()
if has_map_info:
if self.chapter is None:
self.select_map_info()
r = super().to_dict()
r['curr_position'] = self.curr_position
r['curr_capture'] = self.curr_capture
r['is_locked'] = self.is_locked
r['user_id'] = self.user.user_id
if not has_steps:
del r['steps']
if has_rewards:
r['rewards'] = self.rewards
else:
r = {
'map_id': self.map_id,
'curr_position': self.curr_position,
'curr_capture': self.curr_capture,
'is_locked': self.is_locked,
'user_id': self.user.user_id,
}
return r
def initialize(self):
'''初始化数据库信息'''
self.c.execute('''insert into user_world values(:a,:b,0,0,1)''', {
'a': self.user.user_id, 'b': self.map_id})
def update(self):
'''向数据库更新信息'''
self.c.execute('''update user_world set curr_position=:a,curr_capture=:b,is_locked=:c where user_id=:d and map_id=:e''', {
'a': self.curr_position, 'b': self.curr_capture, 'c': 1 if self.is_locked else 0, 'd': self.user.user_id, 'e': self.map_id})
def select(self):
'''获取用户在此地图的信息'''
self.c.execute('''select curr_position, curr_capture, is_locked from user_world where map_id = :a and user_id = :b''',
{'a': self.map_id, 'b': self.user.user_id})
x = self.c.fetchone()
if x:
self.curr_position = x[0]
self.curr_capture = x[1]
self.is_locked = x[2] == 1
else:
self.curr_position = 0
self.curr_capture = 0
self.is_locked = True
self.initialize()
def change_user_current_map(self):
'''改变用户当前地图为此地图'''
self.user.current_map = self
self.c.execute('''update user set current_map = :a where user_id=:b''', {
'a': self.map_id, 'b': self.user.user_id})
def unlock(self) -> bool:
'''解锁用户此地图返回成功与否bool值'''
self.select()
if self.is_locked:
self.is_locked = False
self.curr_position = 0
self.curr_capture = 0
self.select_map_info()
if self.require_type is not None and self.require_type != '':
if self.require_type in ['pack', 'single']:
item = ItemFactory(self.c).get_item(self.require_type)
item.item_id = self.require_id
item.select(self.user)
if not item.amount:
self.is_locked = True
self.update()
return not self.is_locked
def climb(self, step_value: float) -> None:
'''爬梯子,数值非负'''
if step_value < 0:
raise InputError('`Step_value` must be non-negative.')
if self.curr_position is None:
self.select()
if self.is_beyond is None:
self.select_map_info()
if self.is_locked:
raise MapLocked('The map is locked.')
self.prev_capture = self.curr_capture
self.prev_position = self.curr_position
if self.is_beyond: # beyond判断
dt = self.beyond_health - self.prev_capture
self.curr_capture = self.prev_capture + \
step_value if dt >= step_value else self.beyond_health
i = 0
t = self.prev_capture + step_value
while i < self.step_count and t > 0:
dt = self.steps[i].capture
if dt > t:
t = 0
else:
t -= dt
i += 1
if i >= self.step_count:
self.curr_position = self.step_count - 1
else:
self.curr_position = i
else:
i = self.prev_position
j = self.prev_capture
t = step_value
while t > 0 and i < self.step_count:
dt = self.steps[i].capture - j
if dt > t:
j += t
t = 0
else:
t -= dt
j = 0
i += 1
if i >= self.step_count:
self.curr_position = self.step_count - 1
self.curr_capture = 0
else:
self.curr_position = i
self.curr_capture = j
def reclimb(self, step_value: float) -> None:
'''重新爬梯子计算'''
self.curr_position = self.prev_position
self.curr_capture = self.prev_capture
self.climb(step_value)
class Stamina:
'''
体力类
'''
def __init__(self) -> None:
self.__stamina: int = None
self.max_stamina_ts: int = None
def set_value(self, max_stamina_ts: int, stamina: int):
self.max_stamina_ts = int(max_stamina_ts) if max_stamina_ts else 0
self.__stamina = int(stamina) if stamina else Constant.MAX_STAMINA
@property
def stamina(self) -> int:
'''通过计算得到当前的正确体力值'''
stamina = int(Constant.MAX_STAMINA - (self.max_stamina_ts -
int(time()*1000)) / Constant.STAMINA_RECOVER_TICK)
if stamina >= Constant.MAX_STAMINA:
if self.__stamina >= Constant.MAX_STAMINA:
stamina = self.__stamina
else:
stamina = Constant.MAX_STAMINA
return stamina if stamina > 0 else 0
@stamina.setter
def stamina(self, value: int) -> None:
'''设置体力值此处会导致max_stamina_ts变化'''
self.__stamina = int(value)
self.max_stamina_ts = int(
time()*1000) - (self.__stamina-Constant.MAX_STAMINA) * Constant.STAMINA_RECOVER_TICK
class UserStamina(Stamina):
'''
用户体力类\
parameter: `user` - `User`类或子类的实例
'''
def __init__(self, c=None, user=None) -> None:
super().__init__()
self.c = c
self.user = user
def select(self):
'''获取用户体力信息'''
self.c.execute('''select max_stamina_ts, staminafrom user where user_id = :a''',
{'a': self.user.user_id})
x = self.c.fetchone()
if not x:
raise NoData('The user does not exist.')
self.set_value(x[0], x[1])
def update(self):
'''向数据库更新信息'''
self.c.execute('''update user set max_stamina_ts=:b, stamina=:a where user_id=:c''', {
'a': self.stamina, 'b': self.max_stamina_ts, 'c': self.user.user_id})
class WorldPlay:
'''
世界模式打歌类处理特殊角色技能联动UserMap和UserPlay\
parameter: `user` - `UserOnline`类或子类的实例\
'user_play` - `UserPlay`类的实例
'''
def __init__(self, c=None, user=None, user_play=None) -> None:
self.c = c
self.user = user
self.user_play = user_play
self.character_used = None
self.base_step_value: float = None
self.step_value: float = None
self.prog_tempest: float = None
self.overdrive_extra: float = None
self.character_bonus_progress: float = None
@property
def to_dict(self) -> dict:
arcmap: 'UserMap' = self.user.current_map
r = {
"rewards": arcmap.rewards_for_climbing_to_dict,
"exp": self.character_used.level.exp,
"level": self.character_used.level.level,
"base_progress": self.base_step_value,
"progress": self.step_value,
"user_map": {
"user_id": self.user.user_id,
"curr_position": arcmap.curr_position,
"curr_capture": arcmap.curr_capture,
"is_locked": arcmap.is_locked,
"map_id": arcmap.map_id,
"prev_capture": arcmap.prev_capture,
"prev_position": arcmap.prev_position,
"beyond_health": arcmap.beyond_health
},
"char_stats": {
"character_id": self.character_used.character_id,
"frag": self.character_used.frag.get_value(self.character_used.level),
"prog": self.character_used.prog.get_value(self.character_used.level),
"overdrive": self.character_used.overdrive.get_value(self.character_used.level)
},
"current_stamina": self.user.stamina.stamina,
"max_stamina_ts": self.user.stamina.max_stamina_ts
}
if self.overdrive_extra is not None:
r['char_stats']['overdrive'] += self.overdrive_extra
if self.prog_tempest is not None:
r['char_stats']['prog'] += self.prog_tempest
r['char_stats']['prog_tempest'] = self.prog_tempest
if self.character_bonus_progress is not None:
# 猜的,为了让客户端正确显示,当然结果是没问题的
r['base_progress'] += self.character_bonus_progress
r['character_bonus_progress'] = self.character_bonus_progress
if self.user_play.beyond_gauge == 0:
r["user_map"]["steps"] = [
x.to_dict for x in arcmap.steps_for_climbing]
else:
r["user_map"]["steps"] = len(arcmap.steps_for_climbing)
if self.user_play.stamina_multiply != 1:
r['stamina_multiply'] = self.user_play.stamina_multiply
if self.user_play.fragment_multiply != 100:
r['fragment_multiply'] = self.user_play.fragment_multiply
if self.user_play.prog_boost_multiply != 0:
r['prog_boost_multiply'] = self.user_play.prog_boost_multiply
return r
@property
def step_times(self) -> float:
return self.user_play.stamina_multiply * self.user_play.fragment_multiply / 100 * (self.user_play.prog_boost_multiply+100) / 100
@property
def exp_times(self) -> float:
return self.user_play.stamina_multiply * (self.user_play.prog_boost_multiply+100) / 100
def get_step(self) -> None:
if self.user_play.beyond_gauge == 0:
self.base_step_value = 2.5 + 2.45 * self.user_play.rating**0.5
prog = self.character_used.prog.get_value(
self.character_used.level)
if self.prog_tempest:
prog += self.prog_tempest
self.step_value = self.base_step_value * prog / 50 * self.step_times
else:
if self.user_play.clear_type == 0:
self.base_step_value = 25/28 + \
(self.user_play.rating)**0.5 * 0.43
else:
self.base_step_value = 75/28 + \
(self.user_play.rating)**0.5 * 0.43
if self.character_used.character_id in self.user.current_map.character_affinity:
affinity_multiplier = self.user.current_map.affinity_multiplier[self.user.current_map.character_affinity.index(
self.character_used.character_id)]
else:
affinity_multiplier = 1
overdrive = self.character_used.overdrive.get_value(
self.character_used.level)
if self.overdrive_extra:
overdrive += self.overdrive_extra
self.step_value = self.base_step_value * overdrive / \
50 * self.step_times * affinity_multiplier
def update(self) -> None:
'''世界模式更新'''
if self.user_play.prog_boost_multiply != 0:
self.user.update_prog_boost(0)
self.user_play.clear_play_state()
self.user.select_user_about_world_play()
self.character_used = Character()
self.user.character.select_character_info()
if not self.user.is_skill_sealed:
self.character_used = self.user.character
else:
self.character_used.character_id = self.user.character.character_id
self.character_used.level.level = self.user.character.level.level
self.character_used.level.exp = self.user.character.level.exp
self.character_used.frag.set_parameter(50, 50, 50)
self.character_used.prog.set_parameter(50, 50, 50)
self.character_used.overdrive.set_parameter(50, 50, 50)
self.user.current_map.select_map_info()
self.before_calculate()
self.get_step()
self.user.current_map.climb(self.step_value)
self.after_climb()
for i in self.user.current_map.rewards_for_climbing: # 物品分发
for j in i['items']:
j.c = self.c
j.user_claim_item(self.user)
x: 'Step' = self.user.current_map.steps_for_climbing[-1]
if x.step_type:
if 'plusstamina' in x.step_type and x.plus_stamina_value:
# 体力格子
self.user.stamina.stamina += x.plus_stamina_value
self.user.stamina.update()
# 角色升级
if self.character_used.database_table_name == 'user_char':
self.character_used.upgrade(
self.user, self.exp_times*self.user_play.rating*6)
if self.user.current_map.curr_position == self.user.current_map.step_count-1 and self.user.current_map.is_repeatable: # 循环图判断
self.user.current_map.curr_position = 0
self.user.current_map.update()
def before_calculate(self) -> None:
if self.user_play.beyond_gauge == 0:
if self.character_used.character_id == 35 and self.character_used.skill_id_displayed:
self._special_tempest()
else:
if self.character_used.skill_id_displayed == 'skill_vita':
self._skill_vita()
def after_climb(self) -> None:
factory_dict = {'eto_uncap': self._eto_uncap,
'ayu_uncap': self._ayu_uncap, 'luna_uncap': self._luna_uncap}
if self.character_used.skill_id_displayed in factory_dict:
factory_dict[self.character_used.skill_id_displayed]()
def _special_tempest(self) -> None:
'''风暴对立技能prog随全角色等级提升'''
if self.character_used.database_table_name == 'user_char_full':
self.prog_tempest = 60
else:
self.c.execute(
'''select sum(level) from user_char where user_id=?''', (self.user.user_id,))
x = self.c.fetchone()
self.prog_tempest = int(x[0]) / 10 if x else 0
if self.prog_tempest > 60:
self.prog_tempest = 60
elif self.prog_tempest < 0:
self.prog_tempest = 0
def _skill_vita(self) -> None:
'''
vita技能overdrive随回忆率提升提升量最多为10\
此处采用线性函数
'''
self.overdrive_extra = 0
if 0 < self.user_play.health <= 100:
self.overdrive_extra = self.user_play.health / 10
def _eto_uncap(self) -> None:
'''eto觉醒技能获得残片奖励时世界模式进度加7'''
fragment_flag = False
for i in self.user.current_map.rewards_for_climbing:
for j in i['items']:
if j.item_type == 'fragment':
fragment_flag = True
break
if fragment_flag:
break
if fragment_flag:
self.character_bonus_progress = Constant.ETO_UNCAP_BONUS_PROGRESS
self.step_value += self.character_bonus_progress * self.step_times
self.user.current_map.reclimb(self.step_value)
def _luna_uncap(self) -> None:
'''luna觉醒技能限制格开始时世界模式进度加7'''
x: 'Step' = self.user.current_map.steps_for_climbing[0]
if x.restrict_id and x.restrict_type:
self.character_bonus_progress = Constant.LUNA_UNCAP_BONUS_PROGRESS
self.step_value += self.character_bonus_progress * self.step_times
self.user.current_map.reclimb(self.step_value)
def _ayu_uncap(self) -> None:
'''ayu觉醒技能世界模式进度+5或-5但不会小于0'''
self.character_bonus_progress = Constant.AYU_UNCAP_BONUS_PROGRESS if random(
) >= 0.5 else -Constant.AYU_UNCAP_BONUS_PROGRESS
self.step_value += self.character_bonus_progress * self.step_times
if self.step_value < 0:
self.character_bonus_progress += self.step_value / self.step_times
self.step_value = 0
self.user.current_map.reclimb(self.step_value)