[Enhance][Bug fix] download songlist & new skill & API null recent

- Add `amane` and add support for `skill_amane`
- Add a parser for `songlist` to specify downloadable files
- Fix a bug that users with no recent scores cannot get `recent30` via API
This commit is contained in:
Lost-MSth
2022-11-10 18:00:27 +08:00
parent b70bfd6081
commit e1ce4d9ec4
11 changed files with 168 additions and 60 deletions

View File

@@ -1,6 +1,6 @@
from .config_manager import Config
ARCAEA_SERVER_VERSION = 'v2.10.0.3'
ARCAEA_SERVER_VERSION = 'v2.10.1'
class Constant:

View File

@@ -29,20 +29,69 @@ def initialize_songfile():
del x
@lru_cache()
def get_only_3_song_ids():
'''初始化只能下载byd相关的歌曲id'''
if not os.path.isfile(Constant.SONGLIST_FILE_PATH):
return []
only_3_song_ids = []
data = []
with open(Constant.SONGLIST_FILE_PATH, 'r', encoding='utf-8') as f:
data = loads(f.read())['songs']
for x in data:
if 'remote_dl' not in x or 'remote_dl' in x and not x['remote_dl']:
if any(i['ratingClass'] == 3 for i in x['difficulties']):
only_3_song_ids.append(x['id'])
return only_3_song_ids
class SonglistParser:
'''songlist文件解析器'''
FILE_NAMES = ['0.aff', '1.aff', '2.aff', '3.aff',
'base.ogg', '3.ogg', 'video.mp4', 'video_audio.ogg']
songs: dict = {} # {song_id: value, ...}
# value: bit 76543210
# 7: video_audio.ogg
# 6: video.mp4
# 5: 3.ogg
# 4: base.ogg
# 3: 3.aff
# 2: 2.aff
# 1: 1.aff
# 0: 0.aff
def __init__(self, path=Constant.SONGLIST_FILE_PATH) -> None:
self.path = path
self.data: list = []
self.parse()
@staticmethod
def is_available_file(song_id: str, file_name: str) -> list:
'''判断文件是否允许被下载'''
if song_id not in SonglistParser.songs:
# songlist没有则只限制文件名
return file_name in SonglistParser.FILE_NAMES
rule = SonglistParser.songs[song_id]
for i in range(8):
if file_name == SonglistParser.FILE_NAMES[i] and rule & (1 << i) != 0:
return True
return False
def parse_one(self, song: dict) -> dict:
'''解析单个歌曲'''
if not 'id' in song:
return None
r = 0
if 'remote_dl' in song and song['remote_dl']:
r |= 16
for i in song.get('difficulties', []):
if i['ratingClass'] == 3 and i.get('audioOverride', False):
r |= 32
r |= 1 << i['ratingClass']
else:
if any(i['ratingClass'] == 3 for i in song.get('difficulties', [])):
r |= 8
if 'additional_files' in song:
if 'video.mp4' in song['additional_files']:
r |= 64
if 'video_audio.ogg' in song['additional_files']:
r |= 128
return {song['id']: r}
def parse(self) -> None:
'''解析songlist文件'''
if not os.path.isfile(self.path):
return
with open(self.path, 'r', encoding='utf-8') as f:
self.data = loads(f.read()).get('songs', [])
for x in self.data:
self.songs.update(self.parse_one(x))
class UserDownload:
@@ -144,9 +193,9 @@ class DownloadList(UserDownload):
def clear_all_cache():
'''清除所有歌曲文件有关缓存'''
get_song_file_md5.cache_clear()
get_only_3_song_ids.cache_clear()
DownloadList.get_one_song_file_names.cache_clear()
DownloadList.get_all_song_ids.cache_clear()
SonglistParser()
def clear_download_token(self) -> None:
'''清除过期下载链接'''
@@ -164,9 +213,7 @@ class DownloadList(UserDownload):
'''获取一个歌曲文件夹下的所有合法文件名有lru缓存'''
r = []
for i in os.listdir(os.path.join(Constant.SONG_FILE_FOLDER_PATH, song_id)):
if os.path.isfile(os.path.join(Constant.SONG_FILE_FOLDER_PATH, song_id, i)) and i in ['0.aff', '1.aff', '2.aff', '3.aff', 'base.ogg', '3.ogg', 'video.mp4', 'video_audio.ogg']:
if song_id in get_only_3_song_ids() and i not in ['3.aff', '3.ogg']:
continue
if os.path.isfile(os.path.join(Constant.SONG_FILE_FOLDER_PATH, song_id, i)) and SonglistParser.is_available_file(song_id, i):
r.append(i)
return r

View File

@@ -8,6 +8,7 @@ from time import time
from core.config_manager import Config
from core.constant import ARCAEA_SERVER_VERSION
from core.course import Course
from core.download import SonglistParser
from core.purchase import Purchase
from core.sql import Connect, DatabaseMigrator, MemoryDatabase
from core.user import UserRegister
@@ -241,5 +242,7 @@ class FileChecker:
def check_before_run(self) -> bool:
'''运行前检查,返回布尔值'''
# TODO: try
MemoryDatabase() # 初始化内存数据库
SonglistParser() # 解析songlist
return self.check_folder(Config.SONG_FILE_FOLDER_PATH) & self.check_update_database()

View File

@@ -495,6 +495,9 @@ class Potential:
self.c.execute(
'''select * from recent30 where user_id = :a''', {'a': self.user.user_id})
x = self.c.fetchone()
if not x:
raise NoData(
f'No recent30 data for user `{self.user.user_id}`', api_error_code=-3)
self.r30 = []
self.s30 = []
if not x:
@@ -529,11 +532,15 @@ class Potential:
def recent_30_to_dict_list(self) -> list:
if self.r30 is None:
self.select_recent_30()
return [{
'song_id': self.s30[i][:-1],
'difficulty': int(self.s30[i][-1]),
'rating': self.r30[i]
} for i in range(len(self.r30))]
r = []
for x, y in zip(self.s30, self.r30):
if x:
r.append({
'song_id': x[:-1],
'difficulty': int(x[-1]),
'rating': y
})
return r
def recent_30_update(self, pop_index: int, rating: float, song_id_difficulty: str) -> None:
self.r30.pop(pop_index)

View File

@@ -477,7 +477,7 @@ class WorldPlay:
if self.character_bonus_progress is not None:
# 猜的,为了让客户端正确显示,当然结果是没问题的
r['base_progress'] += self.character_bonus_progress
# r['base_progress'] += self.character_bonus_progress # 肯定不是这样的
r['character_bonus_progress'] = self.character_bonus_progress
if self.user_play.beyond_gauge == 0:
@@ -593,7 +593,7 @@ class WorldPlay:
def after_climb(self) -> None:
factory_dict = {'eto_uncap': self._eto_uncap, 'ayu_uncap': self._ayu_uncap,
'luna_uncap': self._luna_uncap, 'skill_fatalis': self._skill_fatalis}
'luna_uncap': self._luna_uncap, 'skill_fatalis': self._skill_fatalis, 'skill_amane': self._skill_amane}
if self.character_used.skill_id_displayed in factory_dict:
factory_dict[self.character_used.skill_id_displayed]()
@@ -666,3 +666,14 @@ class WorldPlay:
self.user.world_mode_locked_end_ts = int(
time()*1000) + Constant.SKILL_FATALIS_WORLD_LOCKED_TIME
self.user.update_user_one_column('world_mode_locked_end_ts')
def _skill_amane(self) -> None:
'''
amane技能起始格为限速或随机成绩小于EX时世界模式进度减半
偷懒在after_climb里面需要重爬一次
'''
x: 'Step' = self.user.current_map.steps_for_climbing[0]
if ('randomsong' in x.step_type or 'speedlimit' in x.step_type) and self.user_play.song_grade < 5:
self.character_bonus_progress = -self.step_value / 2 / self.step_times
self.step_value = self.step_value / 2
self.user.current_map.reclimb(self.step_value)