maimai: Initial Festival support

This commit is contained in:
Dniel97
2023-04-10 18:58:19 +02:00
parent 7fdb3e8222
commit f63dd07937
14 changed files with 347 additions and 198 deletions

View File

@@ -1,5 +1,5 @@
from datetime import datetime, date, timedelta
from typing import Dict
from typing import Any, Dict
import logging
from core.config import CoreConfig
@@ -52,6 +52,7 @@ class Mai2Base:
events = self.data.static.get_enabled_events(self.version)
events_lst = []
if events is None:
self.logger.warn("No enabled events, did you run the reader?")
return {"type": data["type"], "length": 0, "gameEventList": []}
for event in events:
@@ -59,7 +60,11 @@ class Mai2Base:
{
"type": event["type"],
"id": event["eventId"],
"startDate": "2017-12-05 07:00:00.0",
# actually use the startDate from the import so it
# properly shows all the events when new ones are imported
"startDate": datetime.strftime(
event["startDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
),
"endDate": "2099-12-31 00:00:00.0",
}
)
@@ -79,12 +84,12 @@ class Mai2Base:
return {"length": 0, "gameChargeList": []}
charge_list = []
for x in range(len(game_charge_list)):
for i, charge in enumerate(game_charge_list):
charge_list.append(
{
"orderId": x,
"chargeId": game_charge_list[x]["ticketId"],
"price": game_charge_list[x]["price"],
"orderId": i,
"chargeId": charge["ticketId"],
"price": charge["price"],
"startDate": "2017-12-05 07:00:00.0",
"endDate": "2099-12-31 00:00:00.0",
}
@@ -167,6 +172,20 @@ class Mai2Base:
self.data.score.put_playlog(user_id, playlog)
def handle_upsert_user_chargelog_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
charge = data["userCharge"]
# remove the ".0" from the date string, festival only?
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
self.data.item.put_charge(
user_id,
charge["chargeId"],
charge["stock"],
datetime.strptime(charge["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT),
datetime.strptime(charge["validDate"], Mai2Constants.DATE_TIME_FORMAT),
)
def handle_upsert_user_all_api_request(self, data: Dict) -> Dict:
user_id = data["userId"]
upsert = data["upsertUserAll"]
@@ -204,15 +223,21 @@ class Mai2Base:
if "userChargeList" in upsert and len(upsert["userChargeList"]) > 0:
for charge in upsert["userChargeList"]:
# remove the ".0" from the date string, festival only?
charge["purchaseDate"] = charge["purchaseDate"].replace(".0", "")
self.data.item.put_charge(
user_id,
charge["chargeId"],
charge["stock"],
datetime.strptime(charge["purchaseDate"], "%Y-%m-%d %H:%M:%S"),
datetime.strptime(charge["validDate"], "%Y-%m-%d %H:%M:%S")
datetime.strptime(
charge["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT
),
datetime.strptime(
charge["validDate"], Mai2Constants.DATE_TIME_FORMAT
),
)
if upsert["isNewCharacterList"] and int(upsert["isNewCharacterList"]) > 0:
if "userCharacterList" in upsert and len(upsert["userCharacterList"]) > 0:
for char in upsert["userCharacterList"]:
self.data.item.put_character(
user_id,
@@ -222,7 +247,7 @@ class Mai2Base:
char["useCount"],
)
if upsert["isNewItemList"] and int(upsert["isNewItemList"]) > 0:
if "userItemList" in upsert and len(upsert["userItemList"]) > 0:
for item in upsert["userItemList"]:
self.data.item.put_item(
user_id,
@@ -232,7 +257,7 @@ class Mai2Base:
item["isValid"],
)
if upsert["isNewLoginBonusList"] and int(upsert["isNewLoginBonusList"]) > 0:
if "userLoginBonusList" in upsert and len(upsert["userLoginBonusList"]) > 0:
for login_bonus in upsert["userLoginBonusList"]:
self.data.item.put_login_bonus(
user_id,
@@ -242,7 +267,7 @@ class Mai2Base:
login_bonus["isComplete"],
)
if upsert["isNewMapList"] and int(upsert["isNewMapList"]) > 0:
if "userMapList" in upsert and len(upsert["userMapList"]) > 0:
for map in upsert["userMapList"]:
self.data.item.put_map(
user_id,
@@ -253,21 +278,27 @@ class Mai2Base:
map["isComplete"],
)
if upsert["isNewMusicDetailList"] and int(upsert["isNewMusicDetailList"]) > 0:
if "userMusicDetailList" in upsert and len(upsert["userMusicDetailList"]) > 0:
for music in upsert["userMusicDetailList"]:
self.data.score.put_best_score(user_id, music)
if upsert["isNewCourseList"] and int(upsert["isNewCourseList"]) > 0:
if "userCourseList" in upsert and len(upsert["userCourseList"]) > 0:
for course in upsert["userCourseList"]:
self.data.score.put_course(user_id, course)
if upsert["isNewFavoriteList"] and int(upsert["isNewFavoriteList"]) > 0:
if "userFavoriteList" in upsert and len(upsert["userFavoriteList"]) > 0:
for fav in upsert["userFavoriteList"]:
self.data.item.put_favorite(user_id, fav["kind"], fav["itemIdList"])
# if "isNewFriendSeasonRankingList" in upsert and int(upsert["isNewFriendSeasonRankingList"]) > 0:
# for fsr in upsert["userFriendSeasonRankingList"]:
# pass
if (
"userFriendSeasonRankingList" in upsert
and len(upsert["userFriendSeasonRankingList"]) > 0
):
for fsr in upsert["userFriendSeasonRankingList"]:
fsr["recordDate"] = (
datetime.strptime(fsr["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"),
)
self.data.item.put_friend_season_ranking(user_id, fsr)
def handle_user_logout_api_request(self, data: Dict) -> Dict:
pass
@@ -311,11 +342,7 @@ class Mai2Base:
def handle_get_user_card_api_request(self, data: Dict) -> Dict:
user_cards = self.data.item.get_cards(data["userId"])
if user_cards is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userCardList": []
}
return {"userId": data["userId"], "nextIndex": 0, "userCardList": []}
max_ct = data["maxCount"]
next_idx = data["nextIndex"]
@@ -333,25 +360,23 @@ class Mai2Base:
tmp.pop("id")
tmp.pop("user")
tmp["startDate"] = datetime.strftime(
tmp["startDate"], "%Y-%m-%d %H:%M:%S")
tmp["startDate"], Mai2Constants.DATE_TIME_FORMAT
)
tmp["endDate"] = datetime.strftime(
tmp["endDate"], "%Y-%m-%d %H:%M:%S")
tmp["endDate"], Mai2Constants.DATE_TIME_FORMAT
)
card_list.append(tmp)
return {
"userId": data["userId"],
"nextIndex": next_idx,
"userCardList": card_list[start_idx:end_idx]
"userCardList": card_list[start_idx:end_idx],
}
def handle_get_user_charge_api_request(self, data: Dict) -> Dict:
user_charges = self.data.item.get_charges(data["userId"])
if user_charges is None:
return {
"userId": data["userId"],
"length": 0,
"userChargeList": []
}
return {"userId": data["userId"], "length": 0, "userChargeList": []}
user_charge_list = []
for charge in user_charges:
@@ -359,45 +384,46 @@ class Mai2Base:
tmp.pop("id")
tmp.pop("user")
tmp["purchaseDate"] = datetime.strftime(
tmp["purchaseDate"], "%Y-%m-%d %H:%M:%S")
tmp["purchaseDate"], Mai2Constants.DATE_TIME_FORMAT
)
tmp["validDate"] = datetime.strftime(
tmp["validDate"], "%Y-%m-%d %H:%M:%S")
tmp["validDate"], Mai2Constants.DATE_TIME_FORMAT
)
user_charge_list.append(tmp)
return {
"userId": data["userId"],
"length": len(user_charge_list),
"userChargeList": user_charge_list
"userChargeList": user_charge_list,
}
def handle_get_user_item_api_request(self, data: Dict) -> Dict:
kind = int(data["nextIndex"] / 10000000000)
next_idx = int(data["nextIndex"] % 10000000000)
user_items = self.data.item.get_items(data["userId"], kind)
user_item_list = []
next_idx = 0
user_item_list = self.data.item.get_items(data["userId"], kind)
for x in range(next_idx, data["maxCount"]):
try:
user_item_list.append({
"itemKind": user_items[x]["itemKind"],
"itemId": user_items[x]["itemId"],
"stock": user_items[x]["stock"],
"isValid": user_items[x]["isValid"]
})
except IndexError:
items: list[Dict[str, Any]] = []
for i in range(next_idx, len(user_item_list)):
tmp = user_item_list[i]._asdict()
tmp.pop("user")
tmp.pop("id")
items.append(tmp)
if len(items) >= int(data["maxCount"]):
break
if len(user_item_list) == data["maxCount"]:
next_idx = data["nextIndex"] + data["maxCount"] + 1
break
xout = kind * 10000000000 + next_idx + len(items)
if len(items) < int(data["maxCount"]):
next_idx = 0
else:
next_idx = xout
return {
"userId": data["userId"],
"nextIndex": next_idx,
"itemKind": kind,
"userItemList": user_item_list
"userItemList": items,
}
def handle_get_user_character_api_request(self, data: Dict) -> Dict:
@@ -479,21 +505,12 @@ class Mai2Base:
tmp.pop("user")
mlst.append(tmp)
return {
"userActivity": {
"playList": plst,
"musicList": mlst
}
}
return {"userActivity": {"playList": plst, "musicList": mlst}}
def handle_get_user_course_api_request(self, data: Dict) -> Dict:
user_courses = self.data.score.get_courses(data["userId"])
if user_courses is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userCourseList": []
}
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": []}
course_list = []
for course in user_courses:
@@ -502,11 +519,7 @@ class Mai2Base:
tmp.pop("id")
course_list.append(tmp)
return {
"userId": data["userId"],
"nextIndex": 0,
"userCourseList": course_list
}
return {"userId": data["userId"], "nextIndex": 0, "userCourseList": course_list}
def handle_get_user_portrait_api_request(self, data: Dict) -> Dict:
# No support for custom pfps
@@ -514,96 +527,103 @@ class Mai2Base:
def handle_get_user_friend_season_ranking_api_request(self, data: Dict) -> Dict:
friend_season_ranking = self.data.item.get_friend_season_ranking(data["userId"])
friend_season_ranking_list = []
next_index = 0
if friend_season_ranking is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userFriendSeasonRankingList": [],
}
for x in range(data["nextIndex"], data["maxCount"] + data["nextIndex"]):
try:
friend_season_ranking_list.append(
{
"mapId": friend_season_ranking_list[x]["map_id"],
"distance": friend_season_ranking_list[x]["distance"],
"isLock": friend_season_ranking_list[x]["is_lock"],
"isClear": friend_season_ranking_list[x]["is_clear"],
"isComplete": friend_season_ranking_list[x]["is_complete"],
}
)
except:
friend_season_ranking_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(friend_season_ranking)):
tmp = friend_season_ranking[x]._asdict()
tmp.pop("user")
tmp.pop("id")
tmp["recordDate"] = datetime.strftime(
tmp["recordDate"], f"{Mai2Constants.DATE_TIME_FORMAT}.0"
)
friend_season_ranking_list.append(tmp)
if len(friend_season_ranking_list) >= max_ct:
break
# We're capped and still have some left to go
if (
len(friend_season_ranking_list) == data["maxCount"]
and len(friend_season_ranking) > data["maxCount"] + data["nextIndex"]
):
next_index = data["maxCount"] + data["nextIndex"]
if len(friend_season_ranking) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"nextIndex": next_index,
"nextIndex": next_idx,
"userFriendSeasonRankingList": friend_season_ranking_list,
}
def handle_get_user_map_api_request(self, data: Dict) -> Dict:
maps = self.data.item.get_maps(data["userId"])
map_list = []
next_index = 0
if maps is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userMapList": [],
}
for x in range(data["nextIndex"], data["maxCount"] + data["nextIndex"]):
try:
map_list.append(
{
"mapId": maps[x]["map_id"],
"distance": maps[x]["distance"],
"isLock": maps[x]["is_lock"],
"isClear": maps[x]["is_clear"],
"isComplete": maps[x]["is_complete"],
}
)
except:
map_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(maps)):
tmp = maps[x]._asdict()
tmp.pop("user")
tmp.pop("id")
map_list.append(tmp)
if len(map_list) >= max_ct:
break
# We're capped and still have some left to go
if (
len(map_list) == data["maxCount"]
and len(maps) > data["maxCount"] + data["nextIndex"]
):
next_index = data["maxCount"] + data["nextIndex"]
if len(maps) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"nextIndex": next_index,
"nextIndex": next_idx,
"userMapList": map_list,
}
def handle_get_user_login_bonus_api_request(self, data: Dict) -> Dict:
login_bonuses = self.data.item.get_login_bonuses(data["userId"])
login_bonus_list = []
next_index = 0
if login_bonuses is None:
return {
"userId": data["userId"],
"nextIndex": 0,
"userLoginBonusList": [],
}
for x in range(data["nextIndex"], data["maxCount"] + data["nextIndex"]):
try:
login_bonus_list.append(
{
"bonusId": login_bonuses[x]["bonus_id"],
"point": login_bonuses[x]["point"],
"isCurrent": login_bonuses[x]["is_current"],
"isComplete": login_bonuses[x]["is_complete"],
}
)
except:
login_bonus_list = []
next_idx = int(data["nextIndex"])
max_ct = int(data["maxCount"])
for x in range(next_idx, len(login_bonuses)):
tmp = login_bonuses[x]._asdict()
tmp.pop("user")
tmp.pop("id")
login_bonus_list.append(tmp)
if len(login_bonus_list) >= max_ct:
break
# We're capped and still have some left to go
if (
len(login_bonus_list) == data["maxCount"]
and len(login_bonuses) > data["maxCount"] + data["nextIndex"]
):
next_index = data["maxCount"] + data["nextIndex"]
if len(login_bonuses) >= next_idx + max_ct:
next_idx += max_ct
else:
next_idx = 0
return {
"userId": data["userId"],
"nextIndex": next_index,
"nextIndex": next_idx,
"userLoginBonusList": login_bonus_list,
}
@@ -629,5 +649,5 @@ class Mai2Base:
return {
"userId": data["userId"],
"nextIndex": next_index,
"userMusicList": [{"userMusicDetailList": music_detail_list}]
"userMusicList": [{"userMusicDetailList": music_detail_list}],
}