[Enhance] Link Play 2.0 e.t.c.

- For Arcaea 5.10.1(c)
- Add support for Link Play 2.0.
- New partners "Luna & Ilot" and "Eto & Hoppe"
- Add support for the skill of "Eto & Hoppe".
- Add support for refreshing ratings of Recent 30 via API and webpage.

Note: This is a bug testing version.
This commit is contained in:
Lost-MSth
2024-09-06 22:43:38 +08:00
parent 59422f96b5
commit 014531f3f1
21 changed files with 1184 additions and 253 deletions

View File

@@ -8,7 +8,7 @@ def encrypt(key, plaintext, associated_data):
iv = urandom(12)
encryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv, min_tag_length=12),
modes.GCM(iv, min_tag_length=16),
).encryptor()
encryptor.authenticate_additional_data(associated_data)
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
@@ -18,7 +18,7 @@ def encrypt(key, plaintext, associated_data):
def decrypt(key, associated_data, iv, ciphertext, tag):
decryptor = Cipher(
algorithms.AES(key),
modes.GCM(iv, tag, min_tag_length=12),
modes.GCM(iv, tag, min_tag_length=16),
).decryptor()
decryptor.authenticate_additional_data(associated_data)
return decryptor.update(ciphertext) + decryptor.finalize()

View File

@@ -24,9 +24,16 @@ class Config:
COMMAND_INTERVAL = 1000000
COUNTDOWM_TIME = 3999
PLAYER_PRE_TIMEOUT = 3000000
PLAYER_TIMEOUT = 20000000
PLAYER_TIMEOUT = 15000000
LINK_PLAY_UNLOCK_LENGTH = 512
COUNTDOWN_SONG_READY = 4 * 1000000
COUNTDOWN_SONG_START = 6 * 1000000
# 计时模式
COUNTDOWN_MATCHING = 15 * 1000000
COUNTDOWN_SELECT_SONG = 45 * 1000000
COUNTDOWN_SELECT_DIFFICULTY = 45 * 1000000
COUNTDOWN_RESULT = 60 * 1000000

View File

@@ -1,4 +1,4 @@
# import binascii
import binascii
import logging
import socketserver
import threading
@@ -21,11 +21,12 @@ class UDP_handler(socketserver.BaseRequestHandler):
try:
token = client_msg[:8]
iv = client_msg[8:20]
tag = client_msg[20:32]
ciphertext = client_msg[32:]
if bi(token) not in Store.link_play_data:
tag = client_msg[20:36]
ciphertext = client_msg[36:]
user = Store.link_play_data.get(bi(token))
if user is None:
return None
user = Store.link_play_data[bi(token)]
plaintext = decrypt(user['key'], b'', iv, ciphertext, tag)
except Exception as e:
@@ -52,8 +53,7 @@ class UDP_handler(socketserver.BaseRequestHandler):
# logging.info(
# f'UDP-To-{self.client_address[0]}-{binascii.b2a_hex(i)}')
server.sendto(token + iv + tag[:12] +
ciphertext, self.client_address)
server.sendto(token + iv + tag + ciphertext, self.client_address)
AUTH_LEN = len(Config.AUTHENTICATION)
@@ -77,7 +77,7 @@ class TCP_handler(socketserver.StreamRequestHandler):
return None
iv = self.rfile.read(12)
tag = self.rfile.read(12)
tag = self.rfile.read(16)
ciphertext = self.rfile.read(cipher_len)
self.data = decrypt(TCP_AES_KEY, b'', iv, ciphertext, tag)
@@ -96,8 +96,8 @@ class TCP_handler(socketserver.StreamRequestHandler):
if Config.DEBUG:
logging.info(f'TCP-To-{self.client_address[0]}-{r}')
iv, ciphertext, tag = encrypt(TCP_AES_KEY, r.encode('utf-8'), b'')
r = len(ciphertext).to_bytes(8, byteorder='little') + \
iv + tag[:12] + ciphertext
r = len(ciphertext).to_bytes(
8, byteorder='little') + iv + tag + ciphertext
except Exception as e:
logging.error(e)
return None

View File

@@ -7,15 +7,18 @@ from time import time
from .config import Config
from .udp_class import Player, Room, bi
from .udp_sender import CommandSender
class Store:
# token: {'key': key, 'room': Room, 'player_index': player_index, 'player_id': player_id}
link_play_data = {}
room_id_dict = {} # 'room_id': Room
room_id_dict: "dict[int, Room]" = {} # 'room_id': Room
room_code_dict = {} # 'room_code': Room
player_dict = {} # 'player_id' : Player
share_token_dict = {} # 'share_token': Room
lock = RLock()
@@ -28,6 +31,14 @@ def random_room_code():
return re
def random_share_token():
CHARSET = 'abcdefghijklmnopqrstuvwxyz0123456789'
re = ''
for _ in range(10):
re += CHARSET[randint(0, 35)]
return re
def unique_random(dataset, length=8, random_func=None):
'''无重复随机且默认非0没处理可能的死循环'''
if random_func is None:
@@ -45,18 +56,27 @@ def clear_player(token):
# 清除玩家信息和token
player_id = Store.link_play_data[token]['player_id']
logging.info(f'Clean player `{Store.player_dict[player_id].name}`')
del Store.player_dict[player_id]
del Store.link_play_data[token]
with Store.lock:
if player_id in Store.player_dict:
del Store.player_dict[player_id]
if token in Store.link_play_data:
del Store.link_play_data[token]
def clear_room(room):
# 清除房间信息
room_id = room.room_id
room_code = room.room_code
share_token = room.share_token
logging.info(f'Clean room `{room_code}`')
del Store.room_id_dict[room_id]
del Store.room_code_dict[room_code]
del room
with Store.lock:
if room_id in Store.room_id_dict:
del Store.room_id_dict[room_id]
if room_code in Store.room_code_dict:
del Store.room_code_dict[room_code]
if share_token in Store.share_token_dict:
del Store.share_token_dict[share_token]
del room
def memory_clean(now):
@@ -92,6 +112,8 @@ class TCPRouter:
'join_room',
'update_room',
'get_rooms',
'select_room',
'get_match_rooms'
}
def __init__(self, raw_data: 'dict | list'):
@@ -115,7 +137,7 @@ class TCPRouter:
def handle(self) -> dict:
self.clean_check()
if self.endpoint not in self.router:
return None
return {'code': 999}
try:
r = getattr(self, self.endpoint)()
except Exception as e:
@@ -144,7 +166,7 @@ class TCPRouter:
room_id = unique_random(Store.room_id_dict)
room = Room()
room.room_id = room_id
room.timestamp = round(time() * 1000)
room.timestamp = round(time() * 1000000)
Store.room_id_dict[room_id] = room
room_code = unique_random(
@@ -152,6 +174,11 @@ class TCPRouter:
room.room_code = room_code
Store.room_code_dict[room_code] = room
share_token = unique_random(
Store.share_token_dict, random_func=random_share_token)
room.share_token = share_token
Store.share_token_dict[share_token] = room
return room
def create_room(self) -> dict:
@@ -160,6 +187,9 @@ class TCPRouter:
# song_unlock: base64 str
name = self.data['name']
song_unlock = b64decode(self.data['song_unlock'])
rating_ptt = self.data.get('rating_ptt', 0)
is_hide_rating = self.data.get('is_hide_rating', False)
match_times = self.data.get('match_times', None)
key = urandom(16)
with Store.lock:
@@ -167,6 +197,9 @@ class TCPRouter:
player = self.generate_player(name)
player.song_unlock = song_unlock
player.rating_ptt = rating_ptt
player.is_hide_rating = is_hide_rating
player.player_index = 0
room.song_unlock = song_unlock
room.host_id = player.player_id
room.players[0] = player
@@ -174,6 +207,12 @@ class TCPRouter:
token = room.room_id
player.token = token
# 匹配模式追加
if match_times is not None:
room.is_public = 1
room.round_mode = 3
room.timed_mode = 1
Store.link_play_data[token] = {
'key': key,
'room': room,
@@ -198,6 +237,9 @@ class TCPRouter:
key = urandom(16)
name = self.data['name']
song_unlock = b64decode(self.data['song_unlock'])
rating_ptt = self.data.get('rating_ptt', 0)
is_hide_rating = self.data.get('is_hide_rating', False)
match_times = self.data.get('match_times', None)
with Store.lock:
if room_code not in Store.room_code_dict:
@@ -212,7 +254,7 @@ class TCPRouter:
if player_num == 0:
# 房间不存在
return 1202
if room.state != 2:
if room.state not in (0, 1, 2) or (room.is_public and match_times is None):
# 无法加入
return 1205
@@ -221,16 +263,18 @@ class TCPRouter:
player = self.generate_player(name)
player.token = token
player.song_unlock = song_unlock
player.rating_ptt = rating_ptt
player.is_hide_rating = is_hide_rating
room.update_song_unlock()
for i in range(4):
if room.players[i].player_id == 0:
room.players[i] = player
player_index = i
player.player_index = i
break
Store.link_play_data[token] = {
'key': key,
'room': room,
'player_index': player_index,
'player_index': player.player_index,
'player_id': player.player_id
}
@@ -248,11 +292,23 @@ class TCPRouter:
# 房间信息更新
# data = ['3', token]
token = int(self.data['token'])
rating_ptt = self.data.get('rating_ptt', 0)
is_hide_rating = self.data.get('is_hide_rating', False)
with Store.lock:
if token not in Store.link_play_data:
return 108
r = Store.link_play_data[token]
room = r['room']
# 更新玩家信息
player_index = r['player_index']
player = room.players[player_index]
player.rating_ptt = rating_ptt
player.is_hide_rating = is_hide_rating
cs = CommandSender(room)
room.command_queue.append(cs.command_12(player_index))
logging.info(f'TCP-Room `{room.room_code}` info update')
return {
'room_code': room.room_code,
@@ -300,3 +356,55 @@ class TCPRouter:
'has_more': f2,
'rooms': rooms
}
def select_room(self) -> dict:
# 查询房间信息
room_code = self.data.get('room_code', None)
share_token = self.data.get('share_token', None)
if room_code is not None:
room = Store.room_code_dict.get(room_code, None)
elif share_token is not None:
room = Store.share_token_dict.get(share_token, None)
if room is None:
return 108
return {
'room_id': room.room_id,
'room_code': room.room_code,
'share_token': room.share_token,
'is_enterable': room.is_enterable,
'is_matchable': room.is_matchable,
'is_playing': room.is_playing,
'is_public': room.is_public == 1,
'timed_mode': room.timed_mode == 1,
}
def get_match_rooms(self):
n = 0
rooms = []
for room in Store.room_id_dict.values():
if not room.is_matchable:
continue
rooms.append({
'room_id': room.room_id,
'room_code': room.room_code,
'share_token': room.share_token,
'is_matchable': room.is_matchable,
'next_state_timestamp': room.next_state_timestamp,
'song_unlock': b64encode(room.song_unlock).decode('utf-8'),
'players': [{
'player_id': i.player_id,
'name': i.name,
'rating_ptt': i.rating_ptt
} for i in room.players]
})
if n >= 100:
break
return {
'amount': n,
'rooms': rooms
}

View File

@@ -1,5 +1,6 @@
import logging
from time import time
from random import randint
from .config import Config
@@ -12,26 +13,73 @@ def bi(value):
return int.from_bytes(value, byteorder='little')
class Player:
class Score:
def __init__(self) -> None:
self.difficulty = 0xff
self.score = 0
self.cleartype = 0
self.timer = 0
self.best_score_flag = 0 # personal best
self.best_player_flag = 0 # high score
# 5.10 新增
self.shiny_perfect_count = 0 # 2 bytes
self.perfect_count = 0 # 2 bytes
self.near_count = 0 # 2 bytes
self.miss_count = 0 # 2 bytes
self.early_count = 0 # 2 bytes
self.late_count = 0 # 2 bytes
self.healthy = 0 # 4 bytes signed? 不确定,但似乎没影响
def copy(self, x: 'Score'):
self.difficulty = x.difficulty
self.score = x.score
self.cleartype = x.cleartype
self.timer = x.timer
self.best_score_flag = x.best_score_flag
self.best_player_flag = x.best_player_flag
self.shiny_perfect_count = x.shiny_perfect_count
self.perfect_count = x.perfect_count
self.near_count = x.near_count
self.miss_count = x.miss_count
self.early_count = x.early_count
self.late_count = x.late_count
self.healthy = x.healthy
def clear(self):
self.difficulty = 0xff
self.score = 0
self.cleartype = 0
self.timer = 0
self.best_score_flag = 0
self.best_player_flag = 0
self.shiny_perfect_count = 0
self.perfect_count = 0
self.near_count = 0
self.miss_count = 0
self.early_count = 0
self.late_count = 0
self.healthy = 0
def __str__(self):
return f'Score: {self.score}, Cleartype: {self.cleartype}, Difficulty: {self.difficulty}, Timer: {self.timer}, Best Score Flag: {self.best_score_flag}, Best Player Flag: {self.best_player_flag}, Shiny Perfect: {self.shiny_perfect_count}, Perfect: {self.perfect_count}, Near: {self.near_count}, Miss: {self.miss_count}, Early: {self.early_count}, Late: {self.late_count}, Healthy: {self.healthy}'
class Player:
def __init__(self, player_index: int = 0) -> None:
self.player_id = 0
self.player_name = b'\x45\x6d\x70\x74\x79\x50\x6c\x61\x79\x65\x72\x00\x00\x00\x00\x00'
self.token = 0
self.character_id = 0xff
self.last_character_id = 0xff
self.is_uncapped = 0
self.difficulty = 0xff
self.last_difficulty = 0xff
self.score = 0
self.last_score = 0
self.timer = 0
self.last_timer = 0
self.cleartype = 0
self.last_cleartype = 0
self.best_score_flag = 0
self.best_player_flag = 0
self.score = Score()
self.last_score = Score()
self.finish_flag = 0
self.player_state = 1
@@ -45,6 +93,16 @@ class Player:
self.start_command_num = 0
# 5.10 新增
self.voting: int = 0x8000 # 2 bytes, song_idx, 0xffff 为不选择0x8000 为默认值
self.player_index: int = player_index # 1 byte 不确定对不对
self.switch_2: int = 0 # 1 byte
self.rating_ptt: int = 0 # 2 bytes
self.is_hide_rating: int = 0 # 1 byte
self.switch_4: int = 0 # 1 byte 只能确定有 00 和 01
@property
def name(self) -> str:
return self.player_name.decode('ascii').rstrip('\x00')
@@ -56,15 +114,23 @@ class Player:
'is_online': self.online == 1,
'character_id': self.character_id,
'is_uncapped': self.is_uncapped == 1,
'rating_ptt': self.rating_ptt,
'is_hide_rating': self.is_hide_rating == 1,
'last_song': {
'difficulty': self.last_difficulty,
'score': self.last_score,
'cleartype': self.last_cleartype,
'difficulty': self.last_score.difficulty,
'score': self.last_score.score,
'cleartype': self.last_score.cleartype,
'shine_perfect': self.last_score.shiny_perfect_count,
'perfect': self.last_score.perfect_count,
'near': self.last_score.near_count,
'miss': self.last_score.miss_count,
'early': self.last_score.early_count,
'late': self.last_score.late_count,
},
'song': {
'difficulty': self.difficulty,
'score': self.score,
'cleartype': self.cleartype,
'difficulty': self.score.difficulty,
'score': self.score.score,
'cleartype': self.score.cleartype,
},
'player_state': self.player_state,
'last_timestamp': self.last_timestamp,
@@ -77,30 +143,92 @@ class Player:
else:
self.player_name += b'\x00' * (16 - len(self.player_name))
@property
def info(self) -> bytes:
re = bytearray()
re.extend(b(self.player_id, 8))
re.append(self.character_id)
re.append(self.is_uncapped)
re.append(self.score.difficulty)
re.extend(b(self.score.score, 4))
re.extend(b(self.score.timer, 4))
re.append(self.score.cleartype)
re.append(self.player_state)
re.append(self.download_percent)
re.append(self.online)
re.extend(b(self.voting, 2))
re.append(self.player_index)
re.append(self.switch_2)
re.extend(b(self.rating_ptt, 2))
re.append(self.is_hide_rating)
re.append(self.switch_4)
return bytes(re)
@property
def last_score_info(self) -> bytes:
if self.player_id == 0:
return b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
x = self.last_score
re = bytearray()
re.append(self.character_id)
re.append(x.difficulty)
re.extend(b(x.score, 4))
re.append(x.cleartype)
re.append(x.best_score_flag)
re.append(x.best_player_flag)
re.extend(b(x.shiny_perfect_count, 2))
re.extend(b(x.perfect_count, 2))
re.extend(b(x.near_count, 2))
re.extend(b(x.miss_count, 2))
re.extend(b(x.early_count, 2))
re.extend(b(x.late_count, 2))
re.extend(b(x.healthy, 4))
return bytes(re)
class Room:
def __init__(self) -> None:
self.room_id = 0
self.room_code = 'AAAA00'
self.share_token = 'abcde12345' # 5.10 新增
self.countdown = 0xffffffff
self.timestamp = 0
self.state = 0
self.song_idx = 0xffff
self.last_song_idx = 0xffff
self._state = 0
self.song_idx = 0xffff # 疑似 idx * 5
self.last_song_idx = 0xffff # 疑似 idx * 5
self.song_unlock = b'\xFF' * Config.LINK_PLAY_UNLOCK_LENGTH
self.host_id = 0
self.players = [Player(), Player(), Player(), Player()]
self.players = [Player(0), Player(1), Player(2), Player(3)]
self.interval = 1000
self.times = 100
self.times = 100 # ???
self.round_switch = 0
self.round_mode: int = 1 # 5.10 从 bool 修改为 int 1~3
self.is_public = 0 # 5.10 新增
self.timed_mode = 0 # 5.10 新增
self.selected_voter_player_id: int = 0 # 5.10 新增
self.command_queue = []
self.next_state_timestamp = 0 # 计时模式下一个状态时间
@property
def state(self) -> int:
return self._state
@state.setter
def state(self, value: int):
self._state = value
self.countdown = 0xffffffff
def to_dict(self) -> dict:
p = [i.to_dict() for i in self.players if i.player_id != 0]
for i in p:
@@ -108,21 +236,47 @@ class Room:
return {
'room_id': self.room_id,
'room_code': self.room_code,
'share_token': self.share_token,
'state': self.state,
'song_idx': self.song_idx,
'last_song_idx': self.last_song_idx if not self.is_playing else 0xffff,
'host_id': self.host_id,
'players': p,
'round_switch': self.round_switch == 1,
'round_mode': self.round_mode,
'last_timestamp': self.timestamp,
'is_enterable': self.is_enterable,
'is_matchable': self.is_matchable,
'is_playing': self.is_playing,
'is_public': self.is_public == 1,
'timed_mode': self.timed_mode == 1,
}
@property
def room_info(self) -> bytes:
re = bytearray()
re.extend(b(self.host_id, 8))
re.append(self.state)
re.extend(b(self.countdown, 4))
re.extend(b(self.timestamp, 8))
re.extend(b(self.song_idx, 2))
re.extend(b(self.interval, 2))
re.extend(b(self.times, 7))
re.extend(self.get_player_last_score())
re.extend(b(self.last_song_idx, 2))
re.append(self.round_mode)
re.append(self.is_public)
re.append(self.timed_mode)
re.extend(b(self.selected_voter_player_id, 8))
return bytes(re)
@property
def is_enterable(self) -> bool:
return 0 < self.player_num < 4 and self.state == 2
@property
def is_matchable(self) -> bool:
return self.is_public and 0 < self.player_num < 4 and self.state == 1
@property
def is_playing(self) -> bool:
return self.state in (4, 5, 6, 7)
@@ -133,7 +287,9 @@ class Room:
@property
def player_num(self) -> int:
self.check_player_online()
now = round(time() * 1000000)
if now - self.timestamp >= 1000000:
self.check_player_online(now)
return sum(i.player_id != 0 for i in self.players)
def check_player_online(self, now: int = None):
@@ -156,29 +312,18 @@ class Room:
def get_players_info(self):
# 获取所有玩家信息
re = b''
re = bytearray()
for i in self.players:
re += b(i.player_id, 8) + b(i.character_id) + b(i.is_uncapped) + b(i.difficulty) + b(i.score, 4) + \
b(i.timer, 4) + b(i.cleartype) + b(i.player_state) + \
b(i.download_percent) + b(i.online) + b'\x00' + i.player_name
return re
re.extend(i.info)
re.append(0)
re.extend(i.player_name)
return bytes(re)
def get_player_last_score(self):
# 获取上次曲目玩家分数返回bytes
if self.last_song_idx == 0xffff:
return b'\xff\xff\x00\x00\x00\x00\x00\x00\x00' * 4
re = b''
for i in range(4):
player = self.players[i]
if player.player_id != 0:
re += b(player.last_character_id) + b(player.last_difficulty) + b(player.last_score, 4) + b(
player.last_cleartype) + b(player.best_score_flag) + b(player.best_player_flag)
else:
re += b'\xff\xff\x00\x00\x00\x00\x00\x00\x00'
return re
return b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' * 4
return b''.join(i.last_score_info for i in self.players)
def make_round(self):
# 轮换房主
@@ -203,9 +348,19 @@ class Room:
f'Player `{player.name}` leaves room `{self.room_code}`')
self.players[player_index].online = 0
self.players[player_index] = Player()
self.players[player_index] = Player(player_index)
self.update_song_unlock()
if self.state in (2, 3):
self.state = 1
self.song_idx = 0xffff
self.voting_clear()
print(self.player_num)
if self.state in (1, 2) and self.timed_mode and self.player_num <= 1:
self.next_state_timestamp = 0
self.countdown = 0xffffffff
def update_song_unlock(self):
# 更新房间可用歌曲
r = bi(b'\xff' * Config.LINK_PLAY_UNLOCK_LENGTH)
@@ -245,27 +400,110 @@ class Room:
max_score_i = []
for i in range(4):
player = self.players[i]
if player.player_id != 0:
player.finish_flag = 0
player.last_timer = player.timer
player.last_score = player.score
player.last_cleartype = player.cleartype
player.last_character_id = player.character_id
player.last_difficulty = player.difficulty
player.best_player_flag = 0
if player.player_id == 0:
continue
player.finish_flag = 0
player.last_score.copy(player.score)
player.last_score.best_player_flag = 0
if player.last_score > max_score:
max_score = player.last_score
max_score_i = [i]
elif player.last_score == max_score:
max_score_i.append(i)
if player.last_score.score > max_score:
max_score = player.last_score.score
max_score_i = [i]
elif player.last_score.score == max_score:
max_score_i.append(i)
for i in max_score_i:
self.players[i].best_player_flag = 1
self.players[i].last_score.best_player_flag = 1
self.voting_clear()
for i in self.players:
i.score.clear()
logging.info(
f'Room `{self.room_code}` finishes song `{self.song_idx}`')
for i in self.players:
if i.player_id != 0:
logging.info(
f'- Player `{i.name}` - Score: {i.last_score} Cleartype: {i.last_cleartype} Difficulty: {i.last_difficulty}')
logging.info(f'- Player `{i.name}` - {i.last_score}')
@property
def is_all_player_voted(self) -> bool:
# 是否所有玩家都投票
if self.state != 2:
return False
for i in self.players:
if i.player_id != 0 and i.voting == 0x8000:
return False
return True
def random_song(self):
random_list = []
for i in range(Config.LINK_PLAY_UNLOCK_LENGTH):
for j in range(8):
if self.song_unlock[i] & (1 << j):
random_list.append(i * 8 + j)
if not random_list:
self.song_idx = 0
else:
self.song_idx = random_list[randint(0, len(random_list) - 1)]
def make_voting(self):
# 投票
self.state = 3
self.selected_voter_player_id = 0
random_list = []
random_list_player_id = []
for i in self.players:
if i.player_id == 0 or i.voting == 0xffff or i.voting == 0x8000:
continue
random_list.append(i.voting)
random_list_player_id.append(i.player_id)
if random_list:
idx = randint(0, len(random_list) - 1)
self.song_idx = random_list[idx] * 5
self.selected_voter_player_id = random_list_player_id[idx]
else:
self.random_song()
logging.info(
f'Room `{self.room_code}` votes song `{self.song_idx}`')
def voting_clear(self):
# 清除投票
self.selected_voter_player_id = 0
for i in self.players:
i.voting = 0x8000
@property
def should_next_state(self) -> bool:
if not self.timed_mode and self.state not in (4, 5, 6):
self.countdown = 0xffffffff
return False
now = round(time() * 1000000)
if self.countdown == 0xffffffff:
# 还没开始计时
if self.is_public and self.state == 1:
self.next_state_timestamp = now + Config.COUNTDOWN_MATCHING
elif self.state == 2:
self.next_state_timestamp = now + Config.COUNTDOWN_SELECT_SONG
elif self.state == 3:
self.next_state_timestamp = now + Config.COUNTDOWN_SELECT_DIFFICULTY
elif self.state == 4:
self.next_state_timestamp = now + Config.COUNTDOWN_SONG_READY
elif self.state == 5 or self.state == 6:
self.next_state_timestamp = now + Config.COUNTDOWN_SONG_START
elif self.state == 8:
self.next_state_timestamp = now + Config.COUNTDOWN_RESULT
else:
return False
# 不是哥们616 你脑子怎么长的,上个版本是毫秒时间戳,新版本变成了微秒???那你这倒计时怎么还是毫秒啊!!!
self.countdown = (self.next_state_timestamp - now) // 1000
if self.countdown <= 0:
self.countdown = 0
return True
return False

View File

@@ -1,14 +1,27 @@
import logging
import time
from .udp_class import Room, bi
from .config import Config
from .udp_class import Room, bi
from .udp_sender import CommandSender
class CommandParser:
route = [None, 'command_01', 'command_02', 'command_03', 'command_04', 'command_05',
'command_06', 'command_07', 'command_08', 'command_09', 'command_0a', 'command_0b']
route = {
0x01: 'command_01',
0x02: 'command_02',
0x03: 'command_03',
0x04: 'command_04',
0x06: 'command_06',
0x07: 'command_07',
0x08: 'command_08',
0x09: 'command_09',
0x0a: 'command_0a',
0x0b: 'command_0b',
0x20: 'command_20',
0x22: 'command_22',
0x23: 'command_23',
}
def __init__(self, room: Room, player_index: int = 0) -> None:
self.room = room
@@ -31,7 +44,7 @@ class CommandParser:
re.append(self.room.command_queue[i])
if self.room.players[self.player_index].extra_command_queue:
re += self.room.players[self.player_index].extra_command_queue
re += self.room.players[self.player_index].extra_command_queue[-12:]
self.room.players[self.player_index].extra_command_queue = []
if r:
@@ -52,10 +65,14 @@ class CommandParser:
self.room.command_queue.append(self.s.command_10())
def command_02(self):
# 房主选歌
if self.room.round_mode == 3:
logging.warning('Error: round_mode == 3 in command 02')
return None
self.s.random_code = self.command[16:24]
song_idx = bi(self.command[24:26])
flag = 2
flag = 5
if self.room.state == 2:
flag = 0
self.room.state = 3
@@ -69,10 +86,17 @@ class CommandParser:
# 尝试进入结算
self.s.random_code = self.command[16:24]
player = self.room.players[self.player_index]
player.score = bi(self.command[24:28])
player.cleartype = self.command[28]
player.difficulty = self.command[29]
player.best_score_flag = self.command[30]
player.score.score = bi(self.command[24:28])
player.score.cleartype = self.command[28]
player.score.difficulty = self.command[29]
player.score.best_score_flag = self.command[30]
player.score.shiny_perfect_count = bi(self.command[31:33])
player.score.perfect_count = bi(self.command[33:35])
player.score.near_count = bi(self.command[35:37])
player.score.miss_count = bi(self.command[37:39])
player.score.early_count = bi(self.command[39:41])
player.score.late_count = bi(self.command[41:43])
player.score.healthy = bi(self.command[43:47])
player.finish_flag = 1
player.last_timestamp -= Config.COMMAND_INTERVAL
self.room.last_song_idx = self.room.song_idx
@@ -94,19 +118,16 @@ class CommandParser:
flag = 1
self.room.delete_player(i)
self.room.command_queue.append(self.s.command_12(i))
self.room.update_song_unlock()
self.room.command_queue.append(self.s.command_14())
break
return [self.s.command_0d(flag)]
def command_05(self):
pass
def command_06(self):
self.s.random_code = self.command[16:24]
self.room.state = 1
self.room.song_idx = 0xffff
self.room.voting_clear()
self.room.command_queue.append(self.s.command_13())
@@ -117,13 +138,17 @@ class CommandParser:
self.room.command_queue.append(self.s.command_14())
# 07 可能需要一个 0d 响应code = 0x0b
def command_08(self):
self.room.round_switch = bi(self.command[24:25])
self.s.random_code = self.command[16:24]
self.room.command_queue.append(self.s.command_13())
# 可能弃用
logging.warning('Command 08 is outdated')
pass
# self.room.round_mode = bi(self.command[24:25])
# self.s.random_code = self.command[16:24]
# self.room.command_queue.append(self.s.command_13())
def command_09(self):
re = []
self.s.random_code = self.command[16:24]
player = self.room.players[self.player_index]
@@ -133,133 +158,166 @@ class CommandParser:
self.room.update_song_unlock()
player.start_command_num = self.room.command_queue_length
self.room.command_queue.append(self.s.command_15())
else:
if self.s.timestamp - player.last_timestamp >= Config.COMMAND_INTERVAL:
re.append(self.s.command_0c())
player.last_timestamp = self.s.timestamp
return None
# 离线判断
flag_13, player_index_list = self.room.check_player_online(
self.s.timestamp)
for i in player_index_list:
self.room.command_queue.append(self.s.command_12(i))
flag_0c = False
flag_11 = False
flag_12 = False
if self.s.timestamp - player.last_timestamp >= Config.COMMAND_INTERVAL:
flag_0c = True
player.last_timestamp = self.s.timestamp
if player.online == 0:
flag_12 = True
player.online = 1
# 离线判断
flag_13, player_index_list = self.room.check_player_online(
self.s.timestamp)
for i in player_index_list:
self.room.command_queue.append(self.s.command_12(i))
if self.room.is_ready(1, 1):
flag_13 = True
self.room.state = 2
flag_11 = False
flag_12 = False
if player.player_state != self.command[32]:
flag_12 = True
player.player_state = self.command[32]
if player.online == 0:
flag_12 = True
player.online = 1
if player.difficulty != self.command[33] and player.player_state != 5 and player.player_state != 6 and player.player_state != 7 and player.player_state != 8:
flag_12 = True
player.difficulty = self.command[33]
if self.room.state in (1, 2) and player.player_state == 8:
# 还在结算给踢了
# 冗余,为了保险
self.room.delete_player(self.player_index)
self.room.command_queue.append(
self.s.command_12(self.player_index))
self.room.command_queue.append(self.s.command_14())
if player.cleartype != self.command[34] and player.player_state != 7 and player.player_state != 8:
flag_12 = True
player.cleartype = self.command[34]
if self.room.is_ready(1, 1) and ((self.room.player_num > 1 and not self.room.is_public) or (self.room.is_public and self.room.player_num == 4)):
flag_13 = True
self.room.state = 2
if player.download_percent != self.command[35]:
flag_12 = True
player.download_percent = self.command[35]
if self.room.state == 1 and self.room.is_public and self.room.player_num > 1 and self.room.should_next_state:
flag_0c = True
flag_13 = True
self.room.state = 2
if player.character_id != self.command[36]:
flag_12 = True
player.character_id = self.command[36]
if self.room.state in (2, 3) and self.room.player_num < 2:
flag_13 = True
self.room.state = 1
if player.is_uncapped != self.command[37]:
flag_12 = True
player.is_uncapped = self.command[37]
if self.room.state == 2 and self.room.should_next_state:
flag_0c = True
self.room.state = 3
flag_13 = True
if self.room.round_mode == 3:
self.room.make_voting()
else:
self.room.random_song()
if self.room.state == 3 and player.score != bi(self.command[24:28]):
flag_12 = True
player.score = bi(self.command[24:28])
if player.player_state != self.command[32]:
flag_12 = True
player.player_state = self.command[32]
if self.room.is_ready(3, 4):
flag_13 = True
self.room.countdown = Config.COUNTDOWM_TIME
self.room.timestamp = round(time.time() * 1000)
self.room.state = 4
if self.room.round_switch == 1:
# 将换房主时间提前到此刻
self.room.make_round()
if player.score.difficulty != self.command[33] and player.player_state not in (5, 6, 7, 8):
flag_12 = True
player.score.difficulty = self.command[33]
logging.info(f'Room `{self.room.room_code}` starts playing')
if player.score.cleartype != self.command[34] and player.player_state != 7 and player.player_state != 8:
flag_12 = True
player.score.cleartype = self.command[34]
if self.room.state in (4, 5, 6):
timestamp = round(time.time() * 1000)
self.room.countdown -= timestamp - self.room.timestamp
self.room.timestamp = timestamp
if self.room.state == 4 and self.room.countdown <= 0:
# 此处不清楚
self.room.state = 5
self.room.countdown = 5999
flag_11 = True
flag_13 = True
if player.download_percent != self.command[35]:
flag_12 = True
player.download_percent = self.command[35]
if self.room.state == 5 and self.room.is_ready(5, 6):
self.room.state = 6
flag_13 = True
if player.character_id != self.command[36]:
flag_12 = True
player.character_id = self.command[36]
if self.room.state == 5 and self.room.is_ready(5, 7):
self.room.state = 7
self.room.countdown = 0xffffffff
flag_13 = True
if player.is_uncapped != self.command[37]:
flag_12 = True
player.is_uncapped = self.command[37]
if self.room.state == 5 and self.room.countdown <= 0:
print('我怎么知道这是啥')
if self.room.state == 3 and player.score.score != bi(self.command[24:28]):
flag_12 = True
player.score.score = bi(self.command[24:28])
if self.room.state == 6 and self.room.countdown <= 0:
# 此处不清楚
self.room.state = 7
self.room.countdown = 0xffffffff
flag_13 = True
if self.room.is_ready(3, 4) or (self.room.state == 3 and self.room.should_next_state):
flag_13 = True
flag_0c = True
self.room.state = 4
self.room.countdown = self.room.countdown if self.room.countdown > 0 else 0
if self.room.round_mode == 2:
# 将换房主时间提前到此刻
self.room.make_round()
logging.info(f'Room `{self.room.room_code}` starts playing')
if self.room.state in (7, 8):
if player.timer < bi(self.command[28:32]) or bi(self.command[28:32]) == 0 and player.timer != 0:
player.last_timer = player.timer
player.last_score = player.score
player.timer = bi(self.command[28:32])
player.score = bi(self.command[24:28])
if player.timer != 0 or self.room.state != 8:
for i in self.room.players:
i.extra_command_queue.append(
self.s.command_0e(self.player_index))
if self.room.is_ready(8, 1):
flag_13 = True
self.room.state = 1
self.room.song_idx = 0xffff
for i in self.room.players:
i.timer = 0
i.score = 0
if self.room.is_finish():
# 有人退房导致的结算
self.room.make_finish()
flag_13 = True
if flag_11:
self.room.command_queue.append(self.s.command_11())
if flag_12:
if self.room.state == 4:
if player.download_percent != 0xff:
# 有人没下载完把他踢了!
self.room.delete_player(self.player_index)
self.room.command_queue.append(
self.s.command_12(self.player_index))
if flag_13:
self.room.command_queue.append(self.s.command_13())
self.room.command_queue.append(self.s.command_14())
return re
if self.room.should_next_state:
self.room.state = 5
flag_11 = True
flag_13 = True
if self.room.state == 5:
flag_13 = True
if self.room.is_ready(5, 6):
self.room.state = 6
if self.room.is_ready(5, 7):
self.room.state = 7
if self.room.state in (5, 6) and self.room.should_next_state:
# 此处不清楚
self.room.state = 7
flag_13 = True
if self.room.state in (7, 8):
player_now_timer = bi(self.command[28:32])
if player.score.timer < player_now_timer or player_now_timer == 0 and player.score.timer != 0:
player.last_score.timer = player.score.timer
player.last_score.score = player.score.score
player.score.timer = player_now_timer
player.score.score = bi(self.command[24:28])
if player.score.timer != 0 or self.room.state != 8:
for i in self.room.players:
i.extra_command_queue.append(
self.s.command_0e(self.player_index))
if self.room.is_ready(8, 1):
flag_13 = True
self.room.state = 1
self.room.song_idx = 0xffff
if self.room.state == 8 and self.room.should_next_state:
flag_0c = True
flag_13 = True
self.room.state = 1
self.room.song_idx = 0xffff
if self.room.state in (1, 2) and player.player_state == 8:
# 还在结算给踢了
self.room.delete_player(self.player_index)
self.room.command_queue.append(
self.s.command_12(self.player_index))
self.room.command_queue.append(self.s.command_14())
if self.room.is_finish():
# 有人退房导致的结算
self.room.make_finish()
flag_13 = True
if flag_11:
self.room.command_queue.append(self.s.command_11())
if flag_12:
self.room.command_queue.append(
self.s.command_12(self.player_index))
if flag_13:
self.room.command_queue.append(self.s.command_13())
if flag_0c:
return [self.s.command_0c()]
def command_0a(self):
# 退出房间
@@ -267,9 +325,6 @@ class CommandParser:
self.room.command_queue.append(self.s.command_12(self.player_index))
if self.room.state in (2, 3):
self.room.state = 1
self.room.song_idx = 0xffff
# self.room.command_queue.append(self.s.command_11())
self.room.command_queue.append(self.s.command_13())
self.room.command_queue.append(self.s.command_14())
@@ -281,3 +336,45 @@ class CommandParser:
if self.player_index != i and self.room.players[i].online == 1:
self.room.players[i].extra_command_queue.append(
self.s.command_0f(self.player_index, song_idx))
def command_20(self):
# 表情
sticker_id = bi(self.command[16:18])
for i in range(4):
if self.player_index != i and self.room.players[i].online == 1:
self.room.players[i].extra_command_queue.append(
self.s.command_21(self.player_index, sticker_id))
def command_22(self):
# 房间设置,懒得判断房主
self.s.random_code = self.command[16:24]
self.room.is_public = self.command[25]
if self.room.is_public == 0:
self.room.round_mode = self.command[24]
self.room.timed_mode = self.command[26]
else:
self.room.round_mode = 3
self.room.timed_mode = 1
self.room.state = 1
self.room.command_queue.append(self.s.command_11())
self.room.command_queue.append(self.s.command_13())
return [self.s.command_0d(1)]
def command_23(self):
# 歌曲投票
self.s.random_code = self.command[16:24]
if self.room.player_num < 2:
return [self.s.command_0d(6)]
if self.room.state != 2:
return [self.s.command_0d(5)]
player = self.room.players[self.player_index]
player.voting = bi(self.command[24:26])
logging.info(
f'Player `{player.name}` votes for song `{player.voting}`')
self.room.command_queue.append(self.s.command_12(self.player_index))
if self.room.is_all_player_voted:
self.room.make_voting()
self.room.command_queue.append(self.s.command_13())
return [self.s.command_0d(1)]

View File

@@ -1,28 +1,43 @@
from os import urandom
from time import time
from .udp_class import Room, b
PADDING = [b(i) * i for i in range(16)] + [b'']
class CommandSender:
PROTOCOL_NAME = b'\x06\x16'
PROTOCOL_VERSION = b'\x09'
PROTOCOL_VERSION = b'\x0D'
def __init__(self, room: Room = None) -> None:
self.room = room
self.timestamp = round(time() * 1000000)
self.room.timestamp = self.timestamp + 1
self.random_code = b'\x11\x11\x11\x11\x00\x00\x00\x00'
self._random_code = None
@property
def random_code(self):
if self._random_code is None:
self._random_code = urandom(4) + b'\x00\x00\x00\x00'
return self._random_code
@random_code.setter
def random_code(self, value):
self._random_code = value
@staticmethod
def command_encode(t: tuple):
r = b''.join(t)
x = 16 - len(r) % 16
return r + b(x) * x
return r + PADDING[x]
def command_prefix(self, command: bytes):
length = self.room.command_queue_length
if command >= b'\x10':
if b'\x10' <= command <= b'\x1f':
length += 1
return (self.PROTOCOL_NAME, command, self.PROTOCOL_VERSION, b(self.room.room_id, 8), b(length, 4))
@@ -31,12 +46,18 @@ class CommandSender:
return self.command_encode((*self.command_prefix(b'\x0c'), self.random_code, b(self.room.state), b(self.room.countdown, 4), b(self.timestamp, 8)))
def command_0d(self, code: int):
# 3 你不是房主
# 5 有玩家目前无法开始
# 6 需要更多玩家以开始
# 7 有玩家无法游玩这首歌
return self.command_encode((*self.command_prefix(b'\x0d'), self.random_code, b(code)))
def command_0e(self, player_index: int):
# 分数广播
# 我猜616 写错了,首先 4 个 00 大概是分数使用了 8 bytes 转换,其次上一个分数根本就不需要哈哈哈哈哈哈!
player = self.room.players[player_index]
return self.command_encode((*self.command_prefix(b'\x0e'), b(player.player_id, 8), b(player.character_id), b(player.is_uncapped), b(player.difficulty), b(player.score, 4), b(player.timer, 4), b(player.cleartype), b(player.player_state), b(player.download_percent), b'\x01', b(player.last_score, 4), b(player.last_timer, 4), b(player.online)))
return self.command_encode((*self.command_prefix(b'\x0e'), player.info, b(player.last_score.score, 4), b'\x00' * 4, b(player.last_score.timer, 4), b'\x00' * 4))
def command_0f(self, player_index: int, song_idx: int):
# 歌曲推荐
@@ -52,13 +73,17 @@ class CommandSender:
def command_12(self, player_index: int):
player = self.room.players[player_index]
return self.command_encode((*self.command_prefix(b'\x12'), self.random_code, b(player_index), b(player.player_id, 8), b(player.character_id), b(player.is_uncapped), b(player.difficulty), b(player.score, 4), b(player.timer, 4), b(player.cleartype), b(player.player_state), b(player.download_percent), b(player.online)))
return self.command_encode((*self.command_prefix(b'\x12'), self.random_code, b(player_index), player.info))
def command_13(self):
return self.command_encode((*self.command_prefix(b'\x13'), self.random_code, b(self.room.host_id, 8), b(self.room.state), b(self.room.countdown, 4), b(self.timestamp, 8), b(self.room.song_idx, 2), b(self.room.interval, 2), b(self.room.times, 7), self.room.get_player_last_score(), b(self.room.last_song_idx, 2), b(self.room.round_switch, 1)))
return self.command_encode((*self.command_prefix(b'\x13'), self.random_code, self.room.room_info))
def command_14(self):
return self.command_encode((*self.command_prefix(b'\x14'), self.random_code, self.room.song_unlock))
def command_15(self):
return self.command_encode((*self.command_prefix(b'\x15'), self.room.get_players_info(), self.room.song_unlock, b(self.room.host_id, 8), b(self.room.state), b(self.room.countdown, 4), b(self.timestamp, 8), b(self.room.song_idx, 2), b(self.room.interval, 2), b(self.room.times, 7), self.room.get_player_last_score(), b(self.room.last_song_idx, 2), b(self.room.round_switch, 1)))
return self.command_encode((*self.command_prefix(b'\x15'), self.room.get_players_info(), self.room.song_unlock, self.room.room_info))
def command_21(self, player_index: int, sticker_id: int):
player = self.room.players[player_index]
return self.command_encode((*self.command_prefix(b'\x21'), b(player.player_id, 8), b(sticker_id, 2)))