mirror of
https://github.com/Lost-MSth/Arcaea-server.git
synced 2025-12-14 08:06:23 +08:00
- Fix a bug that players will be kicked out when they haven't downloaded the chart before. - Fix a bug that the last song's scores will disappear when all players return to room. - For Arcaea 5.10.2
509 lines
17 KiB
Python
509 lines
17 KiB
Python
import logging
|
||
from time import time
|
||
from random import randint
|
||
|
||
from .config import Config
|
||
|
||
|
||
def b(value, length=1):
|
||
return value.to_bytes(length=length, byteorder='little')
|
||
|
||
|
||
def bi(value):
|
||
return int.from_bytes(value, byteorder='little')
|
||
|
||
|
||
class Score:
|
||
def __init__(self) -> None:
|
||
self.difficulty = 0xff
|
||
|
||
self.score = 0
|
||
self.cleartype = 0
|
||
self.timer = 0
|
||
|
||
self.best_score_flag = 0 # personal best
|
||
self.best_player_flag = 0 # high score
|
||
|
||
# 5.10 新增
|
||
self.shiny_perfect_count = 0 # 2 bytes
|
||
self.perfect_count = 0 # 2 bytes
|
||
self.near_count = 0 # 2 bytes
|
||
self.miss_count = 0 # 2 bytes
|
||
self.early_count = 0 # 2 bytes
|
||
self.late_count = 0 # 2 bytes
|
||
|
||
self.healthy = 0 # 4 bytes signed? 不确定,但似乎没影响
|
||
|
||
def copy(self, x: 'Score'):
|
||
self.difficulty = x.difficulty
|
||
self.score = x.score
|
||
self.cleartype = x.cleartype
|
||
self.timer = x.timer
|
||
self.best_score_flag = x.best_score_flag
|
||
self.best_player_flag = x.best_player_flag
|
||
self.shiny_perfect_count = x.shiny_perfect_count
|
||
self.perfect_count = x.perfect_count
|
||
self.near_count = x.near_count
|
||
self.miss_count = x.miss_count
|
||
self.early_count = x.early_count
|
||
self.late_count = x.late_count
|
||
self.healthy = x.healthy
|
||
|
||
def clear(self):
|
||
self.difficulty = 0xff
|
||
self.score = 0
|
||
self.cleartype = 0
|
||
self.timer = 0
|
||
self.best_score_flag = 0
|
||
self.best_player_flag = 0
|
||
self.shiny_perfect_count = 0
|
||
self.perfect_count = 0
|
||
self.near_count = 0
|
||
self.miss_count = 0
|
||
self.early_count = 0
|
||
self.late_count = 0
|
||
self.healthy = 0
|
||
|
||
def __str__(self):
|
||
return f'Score: {self.score}, Cleartype: {self.cleartype}, Difficulty: {self.difficulty}, Timer: {self.timer}, Best Score Flag: {self.best_score_flag}, Best Player Flag: {self.best_player_flag}, Shiny Perfect: {self.shiny_perfect_count}, Perfect: {self.perfect_count}, Near: {self.near_count}, Miss: {self.miss_count}, Early: {self.early_count}, Late: {self.late_count}, Healthy: {self.healthy}'
|
||
|
||
|
||
class Player:
|
||
def __init__(self, player_index: int = 0) -> None:
|
||
self.player_id = 0
|
||
self.player_name = b'\x45\x6d\x70\x74\x79\x50\x6c\x61\x79\x65\x72\x00\x00\x00\x00\x00'
|
||
self.token = 0
|
||
|
||
self.character_id = 0xff
|
||
self.is_uncapped = 0
|
||
|
||
self.score = Score()
|
||
self.last_score = Score()
|
||
|
||
self.finish_flag = 0
|
||
|
||
self.player_state = 1
|
||
self.download_percent = 0
|
||
self.online = 0
|
||
|
||
self.last_timestamp = 0
|
||
self.extra_command_queue = []
|
||
|
||
self.song_unlock: bytes = b'\x00' * Config.LINK_PLAY_UNLOCK_LENGTH
|
||
|
||
self.start_command_num = 0
|
||
|
||
# 5.10 新增
|
||
|
||
self.voting: int = 0x8000 # 2 bytes, song_idx, 0xffff 为不选择,0x8000 为默认值
|
||
self.player_index: int = player_index # 1 byte 不确定对不对
|
||
self.switch_2: int = 0 # 1 byte
|
||
|
||
self.rating_ptt: int = 0 # 2 bytes
|
||
self.is_hide_rating: int = 0 # 1 byte
|
||
self.is_staff: int = 0 # 1 byte
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return self.player_name.decode('ascii').rstrip('\x00')
|
||
|
||
def to_dict(self) -> dict:
|
||
return {
|
||
'multiplay_player_id': self.player_id,
|
||
'name': self.name,
|
||
'is_online': self.online == 1,
|
||
'character_id': self.character_id,
|
||
'is_uncapped': self.is_uncapped == 1,
|
||
'rating_ptt': self.rating_ptt,
|
||
'is_hide_rating': self.is_hide_rating == 1,
|
||
'last_song': {
|
||
'difficulty': self.last_score.difficulty,
|
||
'score': self.last_score.score,
|
||
'cleartype': self.last_score.cleartype,
|
||
'shine_perfect': self.last_score.shiny_perfect_count,
|
||
'perfect': self.last_score.perfect_count,
|
||
'near': self.last_score.near_count,
|
||
'miss': self.last_score.miss_count,
|
||
'early': self.last_score.early_count,
|
||
'late': self.last_score.late_count,
|
||
},
|
||
'song': {
|
||
'difficulty': self.score.difficulty,
|
||
'score': self.score.score,
|
||
'cleartype': self.score.cleartype,
|
||
},
|
||
'player_state': self.player_state,
|
||
'last_timestamp': self.last_timestamp,
|
||
}
|
||
|
||
def set_player_name(self, player_name: str):
|
||
self.player_name = player_name.encode('ascii')
|
||
if len(self.player_name) > 16:
|
||
self.player_name = self.player_name[:16]
|
||
else:
|
||
self.player_name += b'\x00' * (16 - len(self.player_name))
|
||
|
||
@property
|
||
def info(self) -> bytes:
|
||
re = bytearray()
|
||
re.extend(b(self.player_id, 8))
|
||
re.append(self.character_id)
|
||
re.append(self.is_uncapped)
|
||
re.append(self.score.difficulty)
|
||
re.extend(b(self.score.score, 4))
|
||
re.extend(b(self.score.timer, 4))
|
||
re.append(self.score.cleartype)
|
||
re.append(self.player_state)
|
||
re.append(self.download_percent)
|
||
re.append(self.online)
|
||
|
||
re.extend(b(self.voting, 2))
|
||
re.append(self.player_index)
|
||
re.append(self.switch_2)
|
||
re.extend(b(self.rating_ptt, 2))
|
||
re.append(self.is_hide_rating)
|
||
re.append(self.is_staff)
|
||
|
||
return bytes(re)
|
||
|
||
@property
|
||
def last_score_info(self) -> bytes:
|
||
if self.player_id == 0:
|
||
return b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||
x = self.last_score
|
||
re = bytearray()
|
||
re.append(self.character_id)
|
||
re.append(x.difficulty)
|
||
re.extend(b(x.score, 4))
|
||
re.append(x.cleartype)
|
||
re.append(x.best_score_flag)
|
||
re.append(x.best_player_flag)
|
||
re.extend(b(x.shiny_perfect_count, 2))
|
||
re.extend(b(x.perfect_count, 2))
|
||
re.extend(b(x.near_count, 2))
|
||
re.extend(b(x.miss_count, 2))
|
||
re.extend(b(x.early_count, 2))
|
||
re.extend(b(x.late_count, 2))
|
||
re.extend(b(x.healthy, 4))
|
||
|
||
return bytes(re)
|
||
|
||
|
||
class Room:
|
||
|
||
def __init__(self) -> None:
|
||
self.room_id = 0
|
||
self.room_code = 'AAAA00'
|
||
self.share_token = 'abcde12345' # 5.10 新增
|
||
|
||
self.countdown = 0xffffffff
|
||
self.timestamp = 0
|
||
self._state = 0
|
||
self.song_idx = 0xffff # 疑似 idx * 5
|
||
self.last_song_idx = 0xffff # 疑似 idx * 5
|
||
|
||
self.song_unlock = b'\xFF' * Config.LINK_PLAY_UNLOCK_LENGTH
|
||
|
||
self.host_id = 0
|
||
self.players = [Player(0), Player(1), Player(2), Player(3)]
|
||
|
||
self.interval = 1000
|
||
self.times = 100 # ???
|
||
|
||
self.round_mode: int = 1 # 5.10 从 bool 修改为 int 1~3
|
||
self.is_public = 0 # 5.10 新增
|
||
self.timed_mode = 0 # 5.10 新增
|
||
|
||
self.selected_voter_player_id: int = 0 # 5.10 新增
|
||
|
||
self.command_queue = []
|
||
|
||
self.next_state_timestamp = 0 # 计时模式下一个状态时间
|
||
|
||
@property
|
||
def state(self) -> int:
|
||
return self._state
|
||
|
||
@state.setter
|
||
def state(self, value: int):
|
||
self._state = value
|
||
self.countdown = 0xffffffff
|
||
|
||
def to_dict(self) -> dict:
|
||
p = [i.to_dict() for i in self.players if i.player_id != 0]
|
||
for i in p:
|
||
i['is_host'] = i['multiplay_player_id'] == self.host_id
|
||
return {
|
||
'room_id': self.room_id,
|
||
'room_code': self.room_code,
|
||
'share_token': self.share_token,
|
||
'state': self.state,
|
||
'song_idx': self.song_idx,
|
||
'last_song_idx': self.last_song_idx if not self.is_playing else 0xffff,
|
||
'host_id': self.host_id,
|
||
'players': p,
|
||
'round_mode': self.round_mode,
|
||
'last_timestamp': self.timestamp,
|
||
'is_enterable': self.is_enterable,
|
||
'is_matchable': self.is_matchable,
|
||
'is_playing': self.is_playing,
|
||
'is_public': self.is_public == 1,
|
||
'timed_mode': self.timed_mode == 1,
|
||
}
|
||
|
||
@property
|
||
def room_info(self) -> bytes:
|
||
re = bytearray()
|
||
re.extend(b(self.host_id, 8))
|
||
re.append(self.state)
|
||
re.extend(b(self.countdown, 4))
|
||
re.extend(b(self.timestamp, 8))
|
||
re.extend(b(self.song_idx, 2))
|
||
re.extend(b(self.interval, 2))
|
||
re.extend(b(self.times, 7))
|
||
re.extend(self.get_player_last_score())
|
||
re.extend(b(self.last_song_idx, 2))
|
||
re.append(self.round_mode)
|
||
re.append(self.is_public)
|
||
re.append(self.timed_mode)
|
||
re.extend(b(self.selected_voter_player_id, 8))
|
||
return bytes(re)
|
||
|
||
@property
|
||
def is_enterable(self) -> bool:
|
||
return 0 < self.player_num < 4 and self.state == 2
|
||
|
||
@property
|
||
def is_matchable(self) -> bool:
|
||
return self.is_public and 0 < self.player_num < 4 and self.state == 1
|
||
|
||
@property
|
||
def is_playing(self) -> bool:
|
||
return self.state in (4, 5, 6, 7)
|
||
|
||
@property
|
||
def command_queue_length(self) -> int:
|
||
return len(self.command_queue)
|
||
|
||
@property
|
||
def player_num(self) -> int:
|
||
now = round(time() * 1000000)
|
||
if now - self.timestamp >= 1000000:
|
||
self.check_player_online(now)
|
||
return sum(i.player_id != 0 for i in self.players)
|
||
|
||
def check_player_online(self, now: int = None):
|
||
# 检测玩家是否被自动踢出房间 / 离线判断
|
||
now = round(time() * 1000000) if now is None else now
|
||
flag = False
|
||
player_index_list = []
|
||
for i, x in enumerate(self.players):
|
||
if x.player_id == 0 or x.last_timestamp == 0:
|
||
continue
|
||
if now - x.last_timestamp >= Config.PLAYER_TIMEOUT:
|
||
self.delete_player(i)
|
||
flag = True
|
||
player_index_list.append(i)
|
||
elif x.online == 1 and now - x.last_timestamp >= Config.PLAYER_PRE_TIMEOUT:
|
||
x.online = 0
|
||
player_index_list.append(i)
|
||
|
||
return flag, player_index_list
|
||
|
||
def get_players_info(self):
|
||
# 获取所有玩家信息
|
||
re = bytearray()
|
||
for i in self.players:
|
||
re.extend(i.info)
|
||
re.append(0)
|
||
re.extend(i.player_name)
|
||
return bytes(re)
|
||
|
||
def get_player_last_score(self):
|
||
# 获取上次曲目玩家分数,返回bytes
|
||
if self.last_song_idx == 0xffff:
|
||
return b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' * 4
|
||
return b''.join(i.last_score_info for i in self.players)
|
||
|
||
def make_round(self):
|
||
# 轮换房主
|
||
for i in range(4):
|
||
if self.players[i].player_id == self.host_id:
|
||
for j in range(1, 4):
|
||
player = self.players[(i + j) % 4]
|
||
if player.player_id != 0:
|
||
self.host_id = player.player_id
|
||
logging.info(
|
||
f'Player `{player.name}` becomes the host of room `{self.room_code}`')
|
||
break
|
||
break
|
||
|
||
def delete_player(self, player_index: int):
|
||
# 删除某个玩家
|
||
player = self.players[player_index]
|
||
if player.player_id == self.host_id:
|
||
self.make_round()
|
||
|
||
logging.info(
|
||
f'Player `{player.name}` leaves room `{self.room_code}`')
|
||
|
||
self.players[player_index].online = 0
|
||
self.players[player_index] = Player(player_index)
|
||
self.update_song_unlock()
|
||
|
||
if self.state in (2, 3):
|
||
self.state = 1
|
||
self.song_idx = 0xffff
|
||
self.voting_clear()
|
||
|
||
if self.state in (1, 2) and self.timed_mode and self.player_num <= 1:
|
||
self.next_state_timestamp = 0
|
||
self.countdown = 0xffffffff
|
||
|
||
def update_song_unlock(self):
|
||
# 更新房间可用歌曲
|
||
r = bi(b'\xff' * Config.LINK_PLAY_UNLOCK_LENGTH)
|
||
for i in self.players:
|
||
if i.player_id != 0:
|
||
r &= bi(i.song_unlock)
|
||
|
||
self.song_unlock = b(r, Config.LINK_PLAY_UNLOCK_LENGTH)
|
||
|
||
def is_ready(self, old_state: int, player_state: int):
|
||
# 是否全部准备就绪
|
||
if self.state == old_state:
|
||
for i in self.players:
|
||
if i.player_id != 0 and (i.player_state != player_state or i.online == 0):
|
||
return False
|
||
|
||
return True
|
||
return False
|
||
|
||
def is_finish(self):
|
||
# 是否全部进入结算
|
||
if self.state != 7:
|
||
return False
|
||
|
||
for i in self.players:
|
||
if i.player_id != 0 and (i.finish_flag == 0 or i.online == 0):
|
||
return False
|
||
|
||
return True
|
||
|
||
def make_finish(self):
|
||
# 结算
|
||
self.state = 8
|
||
self.last_song_idx = self.song_idx
|
||
|
||
max_score = 0
|
||
max_score_i = []
|
||
for i in range(4):
|
||
player = self.players[i]
|
||
if player.player_id == 0:
|
||
continue
|
||
player.finish_flag = 0
|
||
player.last_score.copy(player.score)
|
||
player.last_score.best_player_flag = 0
|
||
|
||
if player.last_score.score > max_score:
|
||
max_score = player.last_score.score
|
||
max_score_i = [i]
|
||
elif player.last_score.score == max_score:
|
||
max_score_i.append(i)
|
||
|
||
for i in max_score_i:
|
||
self.players[i].last_score.best_player_flag = 1
|
||
|
||
self.voting_clear()
|
||
for i in self.players:
|
||
i.score.clear()
|
||
|
||
logging.info(
|
||
f'Room `{self.room_code}` finishes song `{self.song_idx}`')
|
||
for i in self.players:
|
||
if i.player_id != 0:
|
||
logging.info(f'- Player `{i.name}` - {i.last_score}')
|
||
|
||
@property
|
||
def is_all_player_voted(self) -> bool:
|
||
# 是否所有玩家都投票
|
||
if self.state != 2:
|
||
return False
|
||
|
||
for i in self.players:
|
||
if i.player_id != 0 and i.voting == 0x8000:
|
||
return False
|
||
|
||
return True
|
||
|
||
def random_song(self):
|
||
random_list = []
|
||
for i in range(Config.LINK_PLAY_UNLOCK_LENGTH):
|
||
for j in range(8):
|
||
if self.song_unlock[i] & (1 << j):
|
||
random_list.append(i * 8 + j)
|
||
|
||
if not random_list:
|
||
self.song_idx = 0
|
||
else:
|
||
self.song_idx = random_list[randint(0, len(random_list) - 1)]
|
||
|
||
def make_voting(self):
|
||
# 投票
|
||
self.state = 3
|
||
self.selected_voter_player_id = 0
|
||
|
||
random_list = []
|
||
random_list_player_id = []
|
||
for i in self.players:
|
||
if i.player_id == 0 or i.voting == 0xffff or i.voting == 0x8000:
|
||
continue
|
||
random_list.append(i.voting)
|
||
random_list_player_id.append(i.player_id)
|
||
|
||
if random_list:
|
||
idx = randint(0, len(random_list) - 1)
|
||
self.song_idx = random_list[idx] * 5
|
||
self.selected_voter_player_id = random_list_player_id[idx]
|
||
else:
|
||
self.random_song()
|
||
|
||
logging.info(
|
||
f'Room `{self.room_code}` votes song `{self.song_idx}`')
|
||
|
||
def voting_clear(self):
|
||
# 清除投票
|
||
self.selected_voter_player_id = 0
|
||
for i in self.players:
|
||
i.voting = 0x8000
|
||
|
||
@property
|
||
def should_next_state(self) -> bool:
|
||
if not self.timed_mode and self.state not in (4, 5, 6):
|
||
self.countdown = 0xffffffff
|
||
return False
|
||
now = round(time() * 1000000)
|
||
if self.countdown == 0xffffffff:
|
||
# 还没开始计时
|
||
if self.is_public and self.state == 1:
|
||
self.next_state_timestamp = now + Config.COUNTDOWN_MATCHING
|
||
elif self.state == 2:
|
||
self.next_state_timestamp = now + Config.COUNTDOWN_SELECT_SONG
|
||
elif self.state == 3:
|
||
self.next_state_timestamp = now + Config.COUNTDOWN_SELECT_DIFFICULTY
|
||
elif self.state == 4:
|
||
self.next_state_timestamp = now + Config.COUNTDOWN_SONG_READY
|
||
elif self.state == 5 or self.state == 6:
|
||
self.next_state_timestamp = now + Config.COUNTDOWN_SONG_START
|
||
elif self.state == 8:
|
||
self.next_state_timestamp = now + Config.COUNTDOWN_RESULT
|
||
else:
|
||
return False
|
||
|
||
# 不是哥们,616 你脑子怎么长的,上个版本是毫秒时间戳,新版本变成了微秒???那你这倒计时怎么还是毫秒啊!!!
|
||
self.countdown = (self.next_state_timestamp - now) // 1000
|
||
if self.countdown <= 0:
|
||
self.countdown = 0
|
||
return True
|
||
return False
|