Merge pull request 'Added Team and Rival support to Chunithm' (#24) from EmmyHeart/artemis:develop into develop

Reviewed-on: https://gitea.tendokyu.moe/Hay1tsme/artemis/pulls/24
This commit is contained in:
Midorica
2023-10-06 01:48:50 +00:00
7 changed files with 285 additions and 12 deletions

View File

@@ -361,11 +361,98 @@ class ChuniBase:
"userDuelList": duel_list,
}
def handle_get_user_rival_data_api_request(self, data: Dict) -> Dict:
p = self.data.profile.get_rival(data["rivalId"])
if p is None:
return {}
userRivalData = {
"rivalId": p.user,
"rivalName": p.userName
}
return {
"userId": data["userId"],
"userRivalData": userRivalData
}
def handle_get_user_rival_music_api_request(self, data: Dict) -> Dict:
m = self.data.score.get_rival_music(data["rivalId"], data["nextIndex"], data["maxCount"])
if m is None:
return {}
user_rival_music_list = []
for music in m:
actual_music_id = self.data.static.get_song(music["musicId"])
if actual_music_id is None:
music_id = music["musicId"]
else:
music_id = actual_music_id["songId"]
level = music["level"]
score = music["score"]
rank = music["rank"]
# Find the existing entry for the current musicId in the user_rival_music_list
music_entry = next((entry for entry in user_rival_music_list if entry["musicId"] == music_id), None)
if music_entry is None:
# If the entry doesn't exist, create a new entry
music_entry = {
"musicId": music_id,
"length": 0,
"userRivalMusicDetailList": []
}
user_rival_music_list.append(music_entry)
# Check if the current score is higher than the previous highest score for the level
level_entry = next(
(
entry
for entry in music_entry["userRivalMusicDetailList"]
if entry["level"] == level
),
None,
)
if level_entry is None or score > level_entry["scoreMax"]:
# If the level entry doesn't exist or the score is higher, update or add the entry
level_entry = {
"level": level,
"scoreMax": score,
"scoreRank": rank
}
if level_entry not in music_entry["userRivalMusicDetailList"]:
music_entry["userRivalMusicDetailList"].append(level_entry)
music_entry["length"] = len(music_entry["userRivalMusicDetailList"])
result = {
"userId": data["userId"],
"rivalId": data["rivalId"],
"nextIndex": -1,
"userRivalMusicList": user_rival_music_list
}
return result
def handle_get_user_rival_music_api_requestded(self, data: Dict) -> Dict:
m = self.data.score.get_rival_music(data["rivalId"], data["nextIndex"], data["maxCount"])
if m is None:
return {}
userRivalMusicList = []
for music in m:
self.logger.debug(music["point"])
return {
"userId": data["userId"],
"rivalId": data["rivalId"],
"nextIndex": -1
}
def handle_get_user_favorite_item_api_request(self, data: Dict) -> Dict:
user_fav_item_list = []
# still needs to be implemented on WebUI
# 1: Music, 3: Character
# 1: Music, 2: User, 3: Character
fav_list = self.data.item.get_all_favorites(
data["userId"], self.version, fav_kind=int(data["kind"])
)
@@ -600,25 +687,43 @@ class ChuniBase:
}
def handle_get_user_team_api_request(self, data: Dict) -> Dict:
# TODO: use the database "chuni_profile_team" with a GUI
# Default values
team_id = 65535
team_name = self.game_cfg.team.team_name
if team_name == "":
team_rank = 0
# Get user profile
profile = self.data.profile.get_profile_data(data["userId"], self.version)
if profile and profile["teamId"]:
# Get team by id
team = self.data.profile.get_team_by_id(profile["teamId"])
if team:
team_id = team["id"]
team_name = team["teamName"]
# Determine whether to use scaled ranks, or original system
if self.game_cfg.team.rank_scale:
team_rank = self.data.profile.get_team_rank(team["id"])
else:
team_rank = self.data.profile.get_team_rank_actual(team["id"])
# Don't return anything if no team name has been defined for defaults and there is no team set for the player
if not profile["teamId"] and team_name == "":
return {"userId": data["userId"], "teamId": 0}
return {
"userId": data["userId"],
"teamId": 1,
"teamRank": 1,
"teamId": team_id,
"teamRank": team_rank,
"teamName": team_name,
"userTeamPoint": {
"userId": data["userId"],
"teamId": 1,
"teamId": team_id,
"orderId": 1,
"teamPoint": 1,
"aggrDate": data["playDate"],
},
}
def handle_get_team_course_setting_api_request(self, data: Dict) -> Dict:
return {
"userId": data["userId"],
@@ -709,9 +814,25 @@ class ChuniBase:
self.data.score.put_playlog(user_id, playlog)
if "userTeamPoint" in upsert:
# TODO: team stuff
pass
team_points = upsert["userTeamPoint"]
try:
for tp in team_points:
if tp["teamId"] != '65535':
# Fetch the current team data
current_team = self.data.profile.get_team_by_id(tp["teamId"])
# Calculate the new teamPoint
new_team_point = int(tp["teamPoint"]) + current_team["teamPoint"]
# Prepare the data to update
team_data = {
"teamPoint": new_team_point
}
# Update the team data
self.data.profile.update_team(tp["teamId"], team_data)
except:
pass # Probably a better way to catch if the team is not set yet (new profiles), but let's just pass
if "userMapAreaList" in upsert:
for map_area in upsert["userMapAreaList"]:
self.data.item.put_map_area(user_id, map_area)
@@ -757,4 +878,4 @@ class ChuniBase:
return {
"userId": data["userId"],
"userNetBattleData": {"recentNBSelectMusicList": []},
}
}

View File

@@ -30,6 +30,11 @@ class ChuniTeamConfig:
return CoreConfig.get_config_field(
self.__config, "chuni", "team", "name", default=""
)
@property
def rank_scale(self) -> str:
return CoreConfig.get_config_field(
self.__config, "chuni", "team", "rank_scale", default="False"
)
class ChuniModsConfig:

View File

@@ -637,3 +637,103 @@ class ChuniProfileData(BaseData):
if result is None:
return None
return result.fetchall()
def get_team_by_id(self, team_id: int) -> Optional[Row]:
sql = select(team).where(team.c.id == team_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_team_rank_actual(self, team_id: int) -> int:
# Normal ranking system, likely the one used in the real servers
# Query all teams sorted by 'teamPoint'
result = self.execute(
select(team.c.id).order_by(team.c.teamPoint.desc())
)
# Get the rank of the team with the given team_id
rank = None
for i, row in enumerate(result, start=1):
if row.id == team_id:
rank = i
break
# Return the rank if found, or a default rank otherwise
return rank if rank is not None else 0
def get_team_rank(self, team_id: int) -> int:
# Scaled ranking system, designed for smaller instances.
# Query all teams sorted by 'teamPoint'
result = self.execute(
select(team.c.id).order_by(team.c.teamPoint.desc())
)
# Count total number of teams
total_teams = self.execute(select(func.count()).select_from(team)).scalar()
# Get the rank of the team with the given team_id
rank = None
for i, row in enumerate(result, start=1):
if row.id == team_id:
rank = i
break
# If the team is not found, return default rank
if rank is None:
return 0
# Define rank tiers
tiers = {
1: range(1, int(total_teams * 0.1) + 1), # Rainbow
2: range(int(total_teams * 0.1) + 1, int(total_teams * 0.4) + 1), # Gold
3: range(int(total_teams * 0.4) + 1, int(total_teams * 0.7) + 1), # Silver
4: range(int(total_teams * 0.7) + 1, total_teams + 1), # Grey
}
# Assign rank based on tier
for tier_rank, tier_range in tiers.items():
if rank in tier_range:
return tier_rank
# Return default rank if not found in any tier
return 0
def update_team(self, team_id: int, team_data: Dict) -> bool:
team_data["id"] = team_id
sql = insert(team).values(**team_data)
conflict = sql.on_duplicate_key_update(**team_data)
result = self.execute(conflict)
if result is None:
self.logger.warn(
f"update_team: Failed to update team! team id: {team_id}"
)
return False
return True
def get_rival(self, rival_id: int) -> Optional[Row]:
sql = select(profile).where(profile.c.user == rival_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def get_overview(self) -> Dict:
# Fetch and add up all the playcounts
playcount_sql = self.execute(select(profile.c.playCount))
if playcount_sql is None:
self.logger.warn(
f"get_overview: Couldn't pull playcounts"
)
return 0
total_play_count = 0;
for row in playcount_sql:
total_play_count += row[0]
return {
"total_play_count": total_play_count
}

View File

@@ -200,3 +200,10 @@ class ChuniScoreData(BaseData):
if result is None:
return None
return result.lastrowid
def get_rival_music(self, rival_id: int, index: int, max_count: int) -> Optional[List[Dict]]:
sql = select(playlog).where(playlog.c.user == rival_id).limit(max_count).offset(index)
result = self.execute(sql)
if result is None:
return None
return result.fetchall()

View File

@@ -453,6 +453,15 @@ class ChuniStaticData(BaseData):
return None
return result.fetchone()
def get_song(self, music_id: int) -> Optional[Row]:
sql = music.select(music.c.id == music_id)
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
def put_avatar(
self,
version: int,
@@ -587,4 +596,4 @@ class ChuniStaticData(BaseData):
result = self.execute(sql)
if result is None:
return None
return result.fetchone()
return result.fetchone()