[Enhance] Recent 30

- Update Recent 30 mechanism.
- Alter Recent 30 table structure.

Note:
1. This is a TEST version. Maybe there are many bugs.
2. This special version is a line of demarcation.
This commit is contained in:
Lost-MSth
2024-04-30 00:27:23 +08:00
parent efedd96908
commit 5c539bdf59
10 changed files with 201 additions and 215 deletions

View File

@@ -385,53 +385,9 @@ class UserPlay(UserScore):
(self.course_play_state, self.course_play.score, self.course_play.clear_type, self.song_token))
def clear_play_state(self) -> None:
self.c.execute('''delete from songplay_token where user_id=:a ''', {
self.c.execute('''delete from songplay_token where user_id=:a''', {
'a': self.user.user_id})
def update_recent30(self) -> None:
'''更新此分数对应用户的recent30'''
old_recent_10 = self.ptt.recent_10
if self.is_protected:
old_r30 = self.ptt.r30.copy()
old_s30 = self.ptt.s30.copy()
# 寻找pop位置
songs = list(set(self.ptt.s30))
if '' in self.ptt.s30:
r30_id = 29
else:
n = len(songs)
if n >= 11:
r30_id = 29
elif self.song.song_id_difficulty not in songs and n == 10:
r30_id = 29
elif self.song.song_id_difficulty in songs and n == 10:
i = 29
while self.ptt.s30[i] == self.song.song_id_difficulty and i > 0:
i -= 1
r30_id = i
elif self.song.song_id_difficulty not in songs and n == 9:
i = 29
while self.ptt.s30.count(self.ptt.s30[i]) == 1 and i > 0:
i -= 1
r30_id = i
else:
r30_id = 29
self.ptt.recent_30_update(
r30_id, self.rating, self.song.song_id_difficulty)
if self.is_protected and old_recent_10 > self.ptt.recent_10:
if self.song.song_id_difficulty in old_s30:
# 发现重复歌曲更新到最高rating
index = old_s30.index(self.song.song_id_difficulty)
if old_r30[index] < self.rating:
old_r30[index] = self.rating
self.ptt.r30 = old_r30
self.ptt.s30 = old_s30
self.ptt.insert_recent_30()
def record_score(self) -> None:
'''向log数据库记录分数请注意列名不同'''
logdb_execute('''insert into user_score values(?,?,?,?,?,?,?,?,?,?,?,?,?)''', (self.user.user_id, self.song.song_id, self.song.difficulty, self.time_played,
@@ -490,7 +446,7 @@ class UserPlay(UserScore):
self.ptt = Potential(self.c, self.user)
if not self.unrank_flag:
self.update_recent30()
self.ptt.r30_push_score(self)
# 总PTT更新
user_rating_ptt = self.ptt.value
@@ -527,9 +483,8 @@ class Potential:
self.c = c
self.user = user
self.r30: 'list[float]' = None
self.s30: 'list[str]' = None
self.songs_selected: list = None
self.r30_tuples: 'list[tuple[int, str, int, float]]' = None
self.r30: 'list[Score]' = None
self.b30: list = None
@@ -545,75 +500,110 @@ class Potential:
'a': self.user.user_id})
return sum(x[0] for x in self.c.fetchall())
def select_recent_30(self) -> None:
def select_recent_30_tuple(self) -> None:
'''获取用户recent30数据'''
self.c.execute(
'''select * from recent30 where user_id = :a''', {'a': self.user.user_id})
x = self.c.fetchone()
if not x:
raise NoData(
f'No recent30 data for user `{self.user.user_id}`', api_error_code=-3)
'''select r_index, song_id, difficulty, rating from recent30 where user_id = ? order by time_played DESC''', (self.user.user_id,))
self.r30_tuples = [x for x in self.c.fetchall() if x[1] != '']
def select_recent_30(self) -> None:
self.c.execute(
'''select song_id, difficulty, score, shiny_perfect_count, perfect_count, near_count, miss_count, health, modifier, time_played, clear_type, rating from recent30 where user_id = ? order by time_played DESC''', (self.user.user_id,))
self.r30 = []
self.s30 = []
if not x:
return None
for i in range(1, 61, 2):
if x[i] is not None:
self.r30.append(float(x[i]))
self.s30.append(x[i+1])
else:
self.r30.append(0)
self.s30.append('')
for x in self.c.fetchall():
if x[0] == '':
continue
s = Score()
s.song.set_chart(x[0], x[1])
s.set_score(*x[2:-1])
s.rating = x[-1]
self.r30.append(s)
@property
def recent_10(self) -> float:
'''获取用户recent10的总潜力值'''
if self.r30 is None:
self.select_recent_30()
if self.r30_tuples is None:
self.select_recent_30_tuple()
rating_sum = 0
r30, s30 = (list(t) for t in zip(
*sorted(zip(self.r30, self.s30), reverse=True)))
max_dict = {}
for x in self.r30_tuples:
if (x[1], x[2]) not in max_dict or max_dict[(x[1], x[2])] < x[3]:
max_dict[(x[1], x[2])] = x[3]
self.songs_selected = []
i = 0
while len(self.songs_selected) < 10 and i <= 29 and s30[i] != '' and s30[i] is not None:
if s30[i] not in self.songs_selected:
rating_sum += r30[i]
self.songs_selected.append(s30[i])
i += 1
return rating_sum
top_10_rating = sorted(max_dict.values(), reverse=True)[:10]
return sum(top_10_rating)
def recent_30_to_dict_list(self) -> list:
if self.r30 is None:
self.select_recent_30()
r = []
for x, y in zip(self.s30, self.r30):
if x:
r.append({
'song_id': x[:-1],
'difficulty': int(x[-1]),
'rating': y
})
return r
def recent_30_update(self, pop_index: int, rating: float, song_id_difficulty: str) -> None:
self.r30.pop(pop_index)
self.s30.pop(pop_index)
self.r30.insert(0, rating)
self.s30.insert(0, song_id_difficulty)
return [x.to_dict() for x in self.r30]
def insert_recent_30(self) -> None:
'''更新r30表数据'''
sql = '''update recent30 set r0=?,song_id0=?,r1=?,song_id1=?,r2=?,song_id2=?,r3=?,song_id3=?,r4=?,song_id4=?,r5=?,song_id5=?,r6=?,song_id6=?,r7=?,song_id7=?,r8=?,song_id8=?,r9=?,song_id9=?,r10=?,song_id10=?,r11=?,song_id11=?,r12=?,song_id12=?,r13=?,song_id13=?,r14=?,song_id14=?,r15=?,song_id15=?,r16=?,song_id16=?,r17=?,song_id17=?,r18=?,song_id18=?,r19=?,song_id19=?,r20=?,song_id20=?,r21=?,song_id21=?,r22=?,song_id22=?,r23=?,song_id23=?,r24=?,song_id24=?,r25=?,song_id25=?,r26=?,song_id26=?,r27=?,song_id27=?,r28=?,song_id28=?,r29=?,song_id29=? where user_id=?'''
sql_list = []
for i in range(30):
sql_list.append(self.r30[i])
sql_list.append(self.s30[i])
def update_one_r30(self, r_index: int, user_score: 'UserPlay | UserScore') -> None:
'''更新数据表中的一条数据'''
self.c.execute('''insert or replace into recent30 values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)''',
(self.user.user_id, r_index, user_score.time_played, user_score.song.song_id, user_score.song.difficulty,
user_score.score, user_score.shiny_perfect_count, user_score.perfect_count, user_score.near_count, user_score.miss_count, user_score.health, user_score.modifier, user_score.clear_type, user_score.rating))
sql_list.append(self.user.user_id)
def r30_push_score(self, user_score: 'UserPlay | UserScore') -> None:
'''根据新成绩调整 r30'''
if self.r30_tuples is None:
self.select_recent_30_tuple()
self.c.execute(sql, sql_list)
if len(self.r30_tuples) < 30:
self.update_one_r30(len(self.r30_tuples), user_score)
return None
if user_score.is_protected:
# 保护,替换最低的最旧的成绩
f_tuples = list(
filter(lambda x: x[-1] <= user_score.rating, self.r30_tuples))
f_tuples.reverse() # 从旧到新
f_tuples = sorted(f_tuples, key=lambda x: x[-1])
if not f_tuples:
# 找不到更低的成绩,不更新
return None
unique_songs: 'dict[tuple[str, int], list[tuple[int, int, float]]]' = {}
for i, x in enumerate(self.r30_tuples):
unique_songs.setdefault((x[1], x[2]), []).append((i, x[0], x[3]))
new_song = user_score.song.to_tuple()
if len(unique_songs) >= 11 or (len(unique_songs) == 10 and new_song not in unique_songs):
if user_score.is_protected:
# 保护,替换最低的最旧的成绩
self.update_one_r30(f_tuples[0][0], user_score)
else:
self.update_one_r30(self.r30_tuples[-1][0], user_score)
return None
filtered_songs = dict(filter(lambda x: len(
x[1]) > 1, unique_songs.items())) # 过滤掉只有单个成绩的
if new_song in unique_songs and new_song not in filtered_songs:
# 如果新成绩有相同谱面的唯一成绩在 r30 中,则它也应该有可能被替换
filtered_songs[new_song] = unique_songs[new_song]
if user_score.is_protected:
# 保护,替换最低的最旧的成绩,此时需在 filtered_songs 中
for x in f_tuples:
if (x[1], x[2]) in filtered_songs:
self.update_one_r30(x[0], user_score)
return None
else:
# 找到符合条件的最旧成绩
max_idx = -1
max_r_index = -1
for x in filtered_songs.values():
for y in x:
if y[0] > max_idx:
max_idx = y[0]
max_r_index = y[1]
self.update_one_r30(max_r_index, user_score)
class UserScoreList: