let black do it's magic

This commit is contained in:
Hay1tsme
2023-03-09 11:38:58 -05:00
parent fa7206848c
commit a76bb94eb1
150 changed files with 8474 additions and 4843 deletions

View File

@@ -7,4 +7,4 @@ index = CxbServlet
database = CxbData
reader = CxbReader
game_codes = [CxbConstants.GAME_CODE]
current_schema_version = 1
current_schema_version = 1

View File

@@ -11,82 +11,91 @@ from titles.cxb.config import CxbConfig
from titles.cxb.const import CxbConstants
from titles.cxb.database import CxbData
class CxbBase():
class CxbBase:
def __init__(self, cfg: CoreConfig, game_cfg: CxbConfig) -> None:
self.config = cfg # Config file
self.config = cfg # Config file
self.game_config = game_cfg
self.data = CxbData(cfg) # Database
self.data = CxbData(cfg) # Database
self.game = CxbConstants.GAME_CODE
self.logger = logging.getLogger("cxb")
self.version = CxbConstants.VER_CROSSBEATS_REV
def handle_action_rpreq_request(self, data: Dict) -> Dict:
return({})
return {}
def handle_action_hitreq_request(self, data: Dict) -> Dict:
return({"data":[]})
return {"data": []}
def handle_auth_usercheck_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_index(0, data["usercheck"]["authid"], self.version)
profile = self.data.profile.get_profile_index(
0, data["usercheck"]["authid"], self.version
)
if profile is not None:
self.logger.info(f"User {data['usercheck']['authid']} has CXB profile")
return({"exist": "true", "logout": "true"})
return {"exist": "true", "logout": "true"}
self.logger.info(f"No profile for aime id {data['usercheck']['authid']}")
return({"exist": "false", "logout": "true"})
return {"exist": "false", "logout": "true"}
def handle_auth_entry_request(self, data: Dict) -> Dict:
self.logger.info(f"New profile for {data['entry']['authid']}")
return({"token": data["entry"]["authid"], "uid": data["entry"]["authid"]})
return {"token": data["entry"]["authid"], "uid": data["entry"]["authid"]}
def handle_auth_login_request(self, data: Dict) -> Dict:
profile = self.data.profile.get_profile_index(0, data["login"]["authid"], self.version)
profile = self.data.profile.get_profile_index(
0, data["login"]["authid"], self.version
)
if profile is not None:
self.logger.info(f"Login user {data['login']['authid']}")
return({"token": data["login"]["authid"], "uid": data["login"]["authid"]})
return {"token": data["login"]["authid"], "uid": data["login"]["authid"]}
self.logger.warn(f"User {data['login']['authid']} does not have a profile")
return({})
return {}
def handle_action_loadrange_request(self, data: Dict) -> Dict:
range_start = data['loadrange']['range'][0]
range_end = data['loadrange']['range'][1]
uid = data['loadrange']['uid']
range_start = data["loadrange"]["range"][0]
range_end = data["loadrange"]["range"][1]
uid = data["loadrange"]["uid"]
self.logger.info(f"Load data for {uid}")
profile = self.data.profile.get_profile(uid, self.version)
songs = self.data.score.get_best_scores(uid)
songs = self.data.score.get_best_scores(uid)
data1 = []
index = []
versionindex = []
for profile_index in profile:
profile_data = profile_index["data"]
if int(range_start) == 800000:
return({"index":range_start, "data":[], "version":10400})
if not ( int(range_start) <= int(profile_index[3]) <= int(range_end) ):
return {"index": range_start, "data": [], "version": 10400}
if not (int(range_start) <= int(profile_index[3]) <= int(range_end)):
continue
#Prevent loading of the coupons within the profile to use the force unlock instead
# Prevent loading of the coupons within the profile to use the force unlock instead
elif 500 <= int(profile_index[3]) <= 510:
continue
#Prevent loading of songs saved in the profile
# Prevent loading of songs saved in the profile
elif 100000 <= int(profile_index[3]) <= 110000:
continue
#Prevent loading of the shop list / unlocked titles & icons saved in the profile
# Prevent loading of the shop list / unlocked titles & icons saved in the profile
elif 200000 <= int(profile_index[3]) <= 210000:
continue
#Prevent loading of stories in the profile
# Prevent loading of stories in the profile
elif 900000 <= int(profile_index[3]) <= 900200:
continue
else:
index.append(profile_index[3])
data1.append(b64encode(bytes(json.dumps(profile_data, separators=(',', ':')), 'utf-8')).decode('utf-8'))
data1.append(
b64encode(
bytes(json.dumps(profile_data, separators=(",", ":")), "utf-8")
).decode("utf-8")
)
'''
"""
100000 = Songs
200000 = Shop
300000 = Courses
@@ -96,101 +105,140 @@ class CxbBase():
700000 = rcLog
800000 = Partners
900000 = Stories
'''
"""
# Coupons
for i in range(500,510):
for i in range(500, 510):
index.append(str(i))
couponid = int(i) - 500
dataValue = [{
"couponId":str(couponid),
"couponNum":"1",
"couponLog":[],
}]
data1.append(b64encode(bytes(json.dumps(dataValue[0], separators=(',', ':')), 'utf-8')).decode('utf-8'))
dataValue = [
{
"couponId": str(couponid),
"couponNum": "1",
"couponLog": [],
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
# ShopList_Title
for i in range(200000,201451):
for i in range(200000, 201451):
index.append(str(i))
shopid = int(i) - 200000
dataValue = [{
"shopId":shopid,
"shopState":"2",
"isDisable":"t",
"isDeleted":"f",
"isSpecialFlag":"f"
}]
data1.append(b64encode(bytes(json.dumps(dataValue[0], separators=(',', ':')), 'utf-8')).decode('utf-8'))
dataValue = [
{
"shopId": shopid,
"shopState": "2",
"isDisable": "t",
"isDeleted": "f",
"isSpecialFlag": "f",
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
#ShopList_Icon
for i in range(202000,202264):
# ShopList_Icon
for i in range(202000, 202264):
index.append(str(i))
shopid = int(i) - 200000
dataValue = [{
"shopId":shopid,
"shopState":"2",
"isDisable":"t",
"isDeleted":"f",
"isSpecialFlag":"f"
}]
data1.append(b64encode(bytes(json.dumps(dataValue[0], separators=(',', ':')), 'utf-8')).decode('utf-8'))
dataValue = [
{
"shopId": shopid,
"shopState": "2",
"isDisable": "t",
"isDeleted": "f",
"isSpecialFlag": "f",
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
#Stories
for i in range(900000,900003):
# Stories
for i in range(900000, 900003):
index.append(str(i))
storyid = int(i) - 900000
dataValue = [{
"storyId":storyid,
"unlockState1":["t"] * 10,
"unlockState2":["t"] * 10,
"unlockState3":["t"] * 10,
"unlockState4":["t"] * 10,
"unlockState5":["t"] * 10,
"unlockState6":["t"] * 10,
"unlockState7":["t"] * 10,
"unlockState8":["t"] * 10,
"unlockState9":["t"] * 10,
"unlockState10":["t"] * 10,
"unlockState11":["t"] * 10,
"unlockState12":["t"] * 10,
"unlockState13":["t"] * 10,
"unlockState14":["t"] * 10,
"unlockState15":["t"] * 10,
"unlockState16":["t"] * 10
}]
data1.append(b64encode(bytes(json.dumps(dataValue[0], separators=(',', ':')), 'utf-8')).decode('utf-8'))
dataValue = [
{
"storyId": storyid,
"unlockState1": ["t"] * 10,
"unlockState2": ["t"] * 10,
"unlockState3": ["t"] * 10,
"unlockState4": ["t"] * 10,
"unlockState5": ["t"] * 10,
"unlockState6": ["t"] * 10,
"unlockState7": ["t"] * 10,
"unlockState8": ["t"] * 10,
"unlockState9": ["t"] * 10,
"unlockState10": ["t"] * 10,
"unlockState11": ["t"] * 10,
"unlockState12": ["t"] * 10,
"unlockState13": ["t"] * 10,
"unlockState14": ["t"] * 10,
"unlockState15": ["t"] * 10,
"unlockState16": ["t"] * 10,
}
]
data1.append(
b64encode(
bytes(json.dumps(dataValue[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
for song in songs:
song_data = song["data"]
songCode = []
songCode.append({
"mcode": song_data['mcode'],
"musicState": song_data['musicState'],
"playCount": song_data['playCount'],
"totalScore": song_data['totalScore'],
"highScore": song_data['highScore'],
"everHighScore": song_data['everHighScore'] if 'everHighScore' in song_data else ["0","0","0","0","0"],
"clearRate": song_data['clearRate'],
"rankPoint": song_data['rankPoint'],
"normalCR": song_data['normalCR'] if 'normalCR' in song_data else ["0","0","0","0","0"],
"survivalCR": song_data['survivalCR'] if 'survivalCR' in song_data else ["0","0","0","0","0"],
"ultimateCR": song_data['ultimateCR'] if 'ultimateCR' in song_data else ["0","0","0","0","0"],
"nohopeCR": song_data['nohopeCR'] if 'nohopeCR' in song_data else ["0","0","0","0","0"],
"combo": song_data['combo'],
"coupleUserId": song_data['coupleUserId'],
"difficulty": song_data['difficulty'],
"isFullCombo": song_data['isFullCombo'],
"clearGaugeType": song_data['clearGaugeType'],
"fieldType": song_data['fieldType'],
"gameType": song_data['gameType'],
"grade": song_data['grade'],
"unlockState": song_data['unlockState'],
"extraState": song_data['extraState']
})
index.append(song_data['index'])
data1.append(b64encode(bytes(json.dumps(songCode[0], separators=(',', ':')), 'utf-8')).decode('utf-8'))
songCode.append(
{
"mcode": song_data["mcode"],
"musicState": song_data["musicState"],
"playCount": song_data["playCount"],
"totalScore": song_data["totalScore"],
"highScore": song_data["highScore"],
"everHighScore": song_data["everHighScore"]
if "everHighScore" in song_data
else ["0", "0", "0", "0", "0"],
"clearRate": song_data["clearRate"],
"rankPoint": song_data["rankPoint"],
"normalCR": song_data["normalCR"]
if "normalCR" in song_data
else ["0", "0", "0", "0", "0"],
"survivalCR": song_data["survivalCR"]
if "survivalCR" in song_data
else ["0", "0", "0", "0", "0"],
"ultimateCR": song_data["ultimateCR"]
if "ultimateCR" in song_data
else ["0", "0", "0", "0", "0"],
"nohopeCR": song_data["nohopeCR"]
if "nohopeCR" in song_data
else ["0", "0", "0", "0", "0"],
"combo": song_data["combo"],
"coupleUserId": song_data["coupleUserId"],
"difficulty": song_data["difficulty"],
"isFullCombo": song_data["isFullCombo"],
"clearGaugeType": song_data["clearGaugeType"],
"fieldType": song_data["fieldType"],
"gameType": song_data["gameType"],
"grade": song_data["grade"],
"unlockState": song_data["unlockState"],
"extraState": song_data["extraState"],
}
)
index.append(song_data["index"])
data1.append(
b64encode(
bytes(json.dumps(songCode[0], separators=(",", ":")), "utf-8")
).decode("utf-8")
)
for v in index:
try:
@@ -198,66 +246,81 @@ class CxbBase():
v_profile_data = v_profile["data"]
versionindex.append(int(v_profile_data["appVersion"]))
except:
versionindex.append('10400')
versionindex.append("10400")
return({"index":index, "data":data1, "version":versionindex})
return {"index": index, "data": data1, "version": versionindex}
def handle_action_saveindex_request(self, data: Dict) -> Dict:
save_data = data['saveindex']
save_data = data["saveindex"]
try:
#REV Omnimix Version Fetcher
gameversion = data['saveindex']['data'][0][2]
# REV Omnimix Version Fetcher
gameversion = data["saveindex"]["data"][0][2]
self.logger.warning(f"Game Version is {gameversion}")
except:
pass
if "10205" in gameversion:
self.logger.info(f"Saving CrossBeats REV profile for {data['saveindex']['uid']}")
#Alright.... time to bring the jank code
for value in data['saveindex']['data']:
if 'playedUserId' in value[1]:
self.data.profile.put_profile(data['saveindex']['uid'], self.version, value[0], value[1])
if 'mcode' not in value[1]:
self.data.profile.put_profile(data['saveindex']['uid'], self.version, value[0], value[1])
if 'shopId' in value:
continue
if 'mcode' in value[1] and 'musicState' in value[1]:
song_json = json.loads(value[1])
songCode = []
songCode.append({
"mcode": song_json['mcode'],
"musicState": song_json['musicState'],
"playCount": song_json['playCount'],
"totalScore": song_json['totalScore'],
"highScore": song_json['highScore'],
"clearRate": song_json['clearRate'],
"rankPoint": song_json['rankPoint'],
"combo": song_json['combo'],
"coupleUserId": song_json['coupleUserId'],
"difficulty": song_json['difficulty'],
"isFullCombo": song_json['isFullCombo'],
"clearGaugeType": song_json['clearGaugeType'],
"fieldType": song_json['fieldType'],
"gameType": song_json['gameType'],
"grade": song_json['grade'],
"unlockState": song_json['unlockState'],
"extraState": song_json['extraState'],
"index": value[0]
})
self.data.score.put_best_score(data['saveindex']['uid'], song_json['mcode'], self.version, value[0], songCode[0])
return({})
else:
self.logger.info(f"Saving CrossBeats REV Sunrise profile for {data['saveindex']['uid']}")
#Sunrise
if "10205" in gameversion:
self.logger.info(
f"Saving CrossBeats REV profile for {data['saveindex']['uid']}"
)
# Alright.... time to bring the jank code
for value in data["saveindex"]["data"]:
if "playedUserId" in value[1]:
self.data.profile.put_profile(
data["saveindex"]["uid"], self.version, value[0], value[1]
)
if "mcode" not in value[1]:
self.data.profile.put_profile(
data["saveindex"]["uid"], self.version, value[0], value[1]
)
if "shopId" in value:
continue
if "mcode" in value[1] and "musicState" in value[1]:
song_json = json.loads(value[1])
songCode = []
songCode.append(
{
"mcode": song_json["mcode"],
"musicState": song_json["musicState"],
"playCount": song_json["playCount"],
"totalScore": song_json["totalScore"],
"highScore": song_json["highScore"],
"clearRate": song_json["clearRate"],
"rankPoint": song_json["rankPoint"],
"combo": song_json["combo"],
"coupleUserId": song_json["coupleUserId"],
"difficulty": song_json["difficulty"],
"isFullCombo": song_json["isFullCombo"],
"clearGaugeType": song_json["clearGaugeType"],
"fieldType": song_json["fieldType"],
"gameType": song_json["gameType"],
"grade": song_json["grade"],
"unlockState": song_json["unlockState"],
"extraState": song_json["extraState"],
"index": value[0],
}
)
self.data.score.put_best_score(
data["saveindex"]["uid"],
song_json["mcode"],
self.version,
value[0],
songCode[0],
)
return {}
else:
self.logger.info(
f"Saving CrossBeats REV Sunrise profile for {data['saveindex']['uid']}"
)
# Sunrise
try:
profileIndex = save_data['index'].index('0')
profileIndex = save_data["index"].index("0")
except:
return({"data":""}) #Maybe
return {"data": ""} # Maybe
profile = json.loads(save_data["data"][profileIndex])
aimeId = profile["aimeId"]
@@ -265,65 +328,91 @@ class CxbBase():
for index, value in enumerate(data["saveindex"]["data"]):
if int(data["saveindex"]["index"][index]) == 101:
self.data.profile.put_profile(aimeId, self.version, data["saveindex"]["index"][index], value)
if int(data["saveindex"]["index"][index]) >= 700000 and int(data["saveindex"]["index"][index])<= 701000:
self.data.profile.put_profile(aimeId, self.version, data["saveindex"]["index"][index], value)
if int(data["saveindex"]["index"][index]) >= 500 and int(data["saveindex"]["index"][index]) <= 510:
self.data.profile.put_profile(aimeId, self.version, data["saveindex"]["index"][index], value)
if 'playedUserId' in value:
self.data.profile.put_profile(aimeId, self.version, data["saveindex"]["index"][index], json.loads(value))
if 'mcode' not in value and "normalCR" not in value:
self.data.profile.put_profile(aimeId, self.version, data["saveindex"]["index"][index], json.loads(value))
if 'shopId' in value:
self.data.profile.put_profile(
aimeId, self.version, data["saveindex"]["index"][index], value
)
if (
int(data["saveindex"]["index"][index]) >= 700000
and int(data["saveindex"]["index"][index]) <= 701000
):
self.data.profile.put_profile(
aimeId, self.version, data["saveindex"]["index"][index], value
)
if (
int(data["saveindex"]["index"][index]) >= 500
and int(data["saveindex"]["index"][index]) <= 510
):
self.data.profile.put_profile(
aimeId, self.version, data["saveindex"]["index"][index], value
)
if "playedUserId" in value:
self.data.profile.put_profile(
aimeId,
self.version,
data["saveindex"]["index"][index],
json.loads(value),
)
if "mcode" not in value and "normalCR" not in value:
self.data.profile.put_profile(
aimeId,
self.version,
data["saveindex"]["index"][index],
json.loads(value),
)
if "shopId" in value:
continue
# MusicList Index for the profile
indexSongList = []
for value in data["saveindex"]["index"]:
if int(value) in range(100000,110000):
if int(value) in range(100000, 110000):
indexSongList.append(value)
for index, value in enumerate(data["saveindex"]["data"]):
if 'mcode' not in value:
if "mcode" not in value:
continue
if 'playedUserId' in value:
if "playedUserId" in value:
continue
data1 = json.loads(value)
songCode = []
songCode.append({
"mcode": data1['mcode'],
"musicState": data1['musicState'],
"playCount": data1['playCount'],
"totalScore": data1['totalScore'],
"highScore": data1['highScore'],
"everHighScore": data1['everHighScore'],
"clearRate": data1['clearRate'],
"rankPoint": data1['rankPoint'],
"normalCR": data1['normalCR'],
"survivalCR": data1['survivalCR'],
"ultimateCR": data1['ultimateCR'],
"nohopeCR": data1['nohopeCR'],
"combo": data1['combo'],
"coupleUserId": data1['coupleUserId'],
"difficulty": data1['difficulty'],
"isFullCombo": data1['isFullCombo'],
"clearGaugeType": data1['clearGaugeType'],
"fieldType": data1['fieldType'],
"gameType": data1['gameType'],
"grade": data1['grade'],
"unlockState": data1['unlockState'],
"extraState": data1['extraState'],
"index": indexSongList[i]
})
songCode.append(
{
"mcode": data1["mcode"],
"musicState": data1["musicState"],
"playCount": data1["playCount"],
"totalScore": data1["totalScore"],
"highScore": data1["highScore"],
"everHighScore": data1["everHighScore"],
"clearRate": data1["clearRate"],
"rankPoint": data1["rankPoint"],
"normalCR": data1["normalCR"],
"survivalCR": data1["survivalCR"],
"ultimateCR": data1["ultimateCR"],
"nohopeCR": data1["nohopeCR"],
"combo": data1["combo"],
"coupleUserId": data1["coupleUserId"],
"difficulty": data1["difficulty"],
"isFullCombo": data1["isFullCombo"],
"clearGaugeType": data1["clearGaugeType"],
"fieldType": data1["fieldType"],
"gameType": data1["gameType"],
"grade": data1["grade"],
"unlockState": data1["unlockState"],
"extraState": data1["extraState"],
"index": indexSongList[i],
}
)
self.data.score.put_best_score(aimeId, data1['mcode'], self.version, indexSongList[i], songCode[0])
self.data.score.put_best_score(
aimeId, data1["mcode"], self.version, indexSongList[i], songCode[0]
)
i += 1
return({})
return {}
def handle_action_sprankreq_request(self, data: Dict) -> Dict:
uid = data['sprankreq']['uid']
uid = data["sprankreq"]["uid"]
self.logger.info(f"Get best rankings for {uid}")
p = self.data.score.get_best_rankings(uid)
@@ -331,90 +420,122 @@ class CxbBase():
for rank in p:
if rank["song_id"] is not None:
rankList.append({
"sc": [rank["score"],rank["song_id"]],
"rid": rank["rev_id"],
"clear": rank["clear"]
})
rankList.append(
{
"sc": [rank["score"], rank["song_id"]],
"rid": rank["rev_id"],
"clear": rank["clear"],
}
)
else:
rankList.append({
"sc": [rank["score"]],
"rid": rank["rev_id"],
"clear": rank["clear"]
})
rankList.append(
{
"sc": [rank["score"]],
"rid": rank["rev_id"],
"clear": rank["clear"],
}
)
return({
return {
"uid": data["sprankreq"]["uid"],
"aid": data["sprankreq"]["aid"],
"rank": rankList,
"rankx":[1,1,1]
})
"rankx": [1, 1, 1],
}
def handle_action_getadv_request(self, data: Dict) -> Dict:
return({"data":[{"r":"1","i":"100300","c":"20"}]})
return {"data": [{"r": "1", "i": "100300", "c": "20"}]}
def handle_action_getmsg_request(self, data: Dict) -> Dict:
return({"msgs":[]})
return {"msgs": []}
def handle_auth_logout_request(self, data: Dict) -> Dict:
return({"auth":True})
def handle_action_rankreg_request(self, data: Dict) -> Dict:
uid = data['rankreg']['uid']
return {"auth": True}
def handle_action_rankreg_request(self, data: Dict) -> Dict:
uid = data["rankreg"]["uid"]
self.logger.info(f"Put {len(data['rankreg']['data'])} rankings for {uid}")
for rid in data['rankreg']['data']:
#REV S2
for rid in data["rankreg"]["data"]:
# REV S2
if "clear" in rid:
try:
self.data.score.put_ranking(user_id=uid, rev_id=int(rid["rid"]), song_id=int(rid["sc"][1]), score=int(rid["sc"][0]), clear=rid["clear"])
self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=int(rid["sc"][1]),
score=int(rid["sc"][0]),
clear=rid["clear"],
)
except:
self.data.score.put_ranking(user_id=uid, rev_id=int(rid["rid"]), song_id=0, score=int(rid["sc"][0]), clear=rid["clear"])
#REV
self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=0,
score=int(rid["sc"][0]),
clear=rid["clear"],
)
# REV
else:
try:
self.data.score.put_ranking(user_id=uid, rev_id=int(rid["rid"]), song_id=int(rid["sc"][1]), score=int(rid["sc"][0]), clear=0)
self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=int(rid["sc"][1]),
score=int(rid["sc"][0]),
clear=0,
)
except:
self.data.score.put_ranking(user_id=uid, rev_id=int(rid["rid"]), song_id=0, score=int(rid["sc"][0]), clear=0)
return({})
self.data.score.put_ranking(
user_id=uid,
rev_id=int(rid["rid"]),
song_id=0,
score=int(rid["sc"][0]),
clear=0,
)
return {}
def handle_action_addenergy_request(self, data: Dict) -> Dict:
uid = data['addenergy']['uid']
uid = data["addenergy"]["uid"]
self.logger.info(f"Add energy to user {uid}")
profile = self.data.profile.get_profile_index(0, uid, self.version)
data1 = profile["data"]
p = self.data.item.get_energy(uid)
energy = p["energy"]
if not p:
if not p:
self.data.item.put_energy(uid, 5)
return({
return {
"class": data1["myClass"],
"granted": "5",
"total": "5",
"threshold": "1000"
})
"threshold": "1000",
}
array = []
newenergy = int(energy) + 5
self.data.item.put_energy(uid, newenergy)
if int(energy) <= 995:
array.append({
"class": data1["myClass"],
"granted": "5",
"total": str(energy),
"threshold": "1000"
})
array.append(
{
"class": data1["myClass"],
"granted": "5",
"total": str(energy),
"threshold": "1000",
}
)
else:
array.append({
"class": data1["myClass"],
"granted": "0",
"total": str(energy),
"threshold": "1000"
})
array.append(
{
"class": data1["myClass"],
"granted": "0",
"total": str(energy),
"threshold": "1000",
}
)
return array[0]
def handle_action_eventreq_request(self, data: Dict) -> Dict:

View File

@@ -1,40 +1,60 @@
from core.config import CoreConfig
class CxbServerConfig():
class CxbServerConfig:
def __init__(self, parent_config: "CxbConfig"):
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'enable', default=True)
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "enable", default=True
)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'loglevel', default="info"))
return CoreConfig.str_to_loglevel(
CoreConfig.get_config_field(
self.__config, "cxb", "server", "loglevel", default="info"
)
)
@property
def hostname(self) -> str:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'hostname', default="localhost")
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "hostname", default="localhost"
)
@property
def ssl_enable(self) -> bool:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'ssl_enable', default=False)
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "ssl_enable", default=False
)
@property
def port(self) -> int:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'port', default=8082)
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "port", default=8082
)
@property
def port_secure(self) -> int:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'port_secure', default=443)
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "port_secure", default=443
)
@property
def ssl_cert(self) -> str:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'ssl_cert', default="cert/title.crt")
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "ssl_cert", default="cert/title.crt"
)
@property
def ssl_key(self) -> str:
return CoreConfig.get_config_field(self.__config, 'cxb', 'server', 'ssl_key', default="cert/title.key")
return CoreConfig.get_config_field(
self.__config, "cxb", "server", "ssl_key", default="cert/title.key"
)
class CxbConfig(dict):
def __init__(self) -> None:

View File

@@ -1,4 +1,4 @@
class CxbConstants():
class CxbConstants:
GAME_CODE = "SDCA"
CONFIG_NAME = "cxb.yaml"
@@ -8,8 +8,13 @@ class CxbConstants():
VER_CROSSBEATS_REV_SUNRISE_S2 = 2
VER_CROSSBEATS_REV_SUNRISE_S2_OMNI = 3
VERSION_NAMES = ("crossbeats REV.", "crossbeats REV. SUNRISE", "crossbeats REV. SUNRISE S2", "crossbeats REV. SUNRISE S2 Omnimix")
VERSION_NAMES = (
"crossbeats REV.",
"crossbeats REV. SUNRISE",
"crossbeats REV. SUNRISE S2",
"crossbeats REV. SUNRISE S2 Omnimix",
)
@classmethod
def game_ver_to_string(cls, ver: int):
return cls.VERSION_NAMES[ver]
return cls.VERSION_NAMES[ver]

View File

@@ -1,8 +1,8 @@
from core.data import Data
from core.config import CoreConfig
from titles.cxb.schema import CxbProfileData, CxbScoreData, CxbItemData, CxbStaticData
class CxbData(Data):
def __init__(self, cfg: CoreConfig) -> None:
super().__init__(cfg)

View File

@@ -17,6 +17,7 @@ from titles.cxb.rev import CxbRev
from titles.cxb.rss1 import CxbRevSunriseS1
from titles.cxb.rss2 import CxbRevSunriseS2
class CxbServlet(resource.Resource):
def __init__(self, core_cfg: CoreConfig, cfg_dir: str) -> None:
self.isLeaf = True
@@ -24,62 +25,85 @@ class CxbServlet(resource.Resource):
self.core_cfg = core_cfg
self.game_cfg = CxbConfig()
if path.exists(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"):
self.game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}")))
self.game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"))
)
self.logger = logging.getLogger("cxb")
if not hasattr(self.logger, "inited"):
log_fmt_str = "[%(asctime)s] CXB | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler("{0}/{1}.log".format(self.core_cfg.server.log_dir, "cxb"), encoding='utf8',
when="d", backupCount=10)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.core_cfg.server.log_dir, "cxb"),
encoding="utf8",
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.game_cfg.server.loglevel)
coloredlogs.install(level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str)
coloredlogs.install(
level=self.game_cfg.server.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.inited = True
self.versions = [
CxbRev(core_cfg, self.game_cfg),
CxbRevSunriseS1(core_cfg, self.game_cfg),
CxbRevSunriseS2(core_cfg, self.game_cfg),
]
@classmethod
def get_allnet_info(cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str) -> Tuple[bool, str, str]:
def get_allnet_info(
cls, game_code: str, core_cfg: CoreConfig, cfg_dir: str
) -> Tuple[bool, str, str]:
game_cfg = CxbConfig()
if path.exists(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"):
game_cfg.update(yaml.safe_load(open(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}")))
game_cfg.update(
yaml.safe_load(open(f"{cfg_dir}/{CxbConstants.CONFIG_NAME}"))
)
if not game_cfg.server.enable:
return (False, "", "")
if core_cfg.server.is_develop:
return (True, f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/", "")
return (
True,
f"http://{core_cfg.title.hostname}:{core_cfg.title.port}/{game_code}/$v/",
"",
)
return (True, f"http://{core_cfg.title.hostname}/{game_code}/$v/", "")
def setup(self):
if self.game_cfg.server.enable:
endpoints.serverFromString(reactor, f"tcp:{self.game_cfg.server.port}:interface={self.core_cfg.server.listen_address}")\
.listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
if self.core_cfg.server.is_develop and self.game_cfg.server.ssl_enable:
endpoints.serverFromString(reactor, f"ssl:{self.game_cfg.server.port_secure}"\
f":interface={self.core_cfg.server.listen_address}:privateKey={self.game_cfg.server.ssl_key}:"\
f"certKey={self.game_cfg.server.ssl_cert}")\
.listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
endpoints.serverFromString(
reactor,
f"tcp:{self.game_cfg.server.port}:interface={self.core_cfg.server.listen_address}",
).listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
self.logger.info(f"Crossbeats title server ready on port {self.game_cfg.server.port} & {self.game_cfg.server.port_secure}")
if self.core_cfg.server.is_develop and self.game_cfg.server.ssl_enable:
endpoints.serverFromString(
reactor,
f"ssl:{self.game_cfg.server.port_secure}"
f":interface={self.core_cfg.server.listen_address}:privateKey={self.game_cfg.server.ssl_key}:"
f"certKey={self.game_cfg.server.ssl_cert}",
).listen(server.Site(CxbServlet(self.core_cfg, self.cfg_dir)))
self.logger.info(
f"Crossbeats title server ready on port {self.game_cfg.server.port} & {self.game_cfg.server.port_secure}"
)
else:
self.logger.info(f"Crossbeats title server ready on port {self.game_cfg.server.port}")
self.logger.info(
f"Crossbeats title server ready on port {self.game_cfg.server.port}"
)
def render_POST(self, request: Request):
version = 0
@@ -96,21 +120,28 @@ class CxbServlet(resource.Resource):
except Exception as e:
try:
req_json: Dict = json.loads(req_bytes.decode().replace('"', '\\"').replace("'", '"'))
req_json: Dict = json.loads(
req_bytes.decode().replace('"', '\\"').replace("'", '"')
)
except Exception as f:
self.logger.warn(f"Error decoding json: {e} / {f} - {req_url} - {req_bytes}")
self.logger.warn(
f"Error decoding json: {e} / {f} - {req_url} - {req_bytes}"
)
return b""
if req_json == {}:
self.logger.warn(f"Empty json request to {req_url}")
return b""
cmd = url_split[len(url_split) - 1]
subcmd = list(req_json.keys())[0]
if subcmd == "dldate":
if not type(req_json["dldate"]) is dict or "filetype" not in req_json["dldate"]:
if (
not type(req_json["dldate"]) is dict
or "filetype" not in req_json["dldate"]
):
self.logger.warn(f"Malformed dldate request: {req_url} {req_json}")
return b""
@@ -119,7 +150,9 @@ class CxbServlet(resource.Resource):
version = int(filetype_split[0])
filetype_inflect_split = inflection.underscore(filetype).split("/")
match = re.match("^([A-Za-z]*)(\d\d\d\d)$", filetype_split[len(filetype_split) - 1])
match = re.match(
"^([A-Za-z]*)(\d\d\d\d)$", filetype_split[len(filetype_split) - 1]
)
if match:
subcmd = f"{inflection.underscore(match.group(1))}xxxx"
else:
@@ -128,7 +161,7 @@ class CxbServlet(resource.Resource):
filetype = subcmd
func_to_find = f"handle_{cmd}_{subcmd}_request"
if version <= 10102:
version_string = "Rev"
internal_ver = CxbConstants.VER_CROSSBEATS_REV
@@ -136,28 +169,28 @@ class CxbServlet(resource.Resource):
elif version == 10113 or version == 10103:
version_string = "Rev SunriseS1"
internal_ver = CxbConstants.VER_CROSSBEATS_REV_SUNRISE_S1
elif version >= 10114 or version == 10104:
version_string = "Rev SunriseS2"
internal_ver = CxbConstants.VER_CROSSBEATS_REV_SUNRISE_S2
else:
version_string = "Base"
self.logger.info(f"{version_string} Request {req_url} -> {filetype}")
self.logger.debug(req_json)
try:
handler = getattr(self.versions[internal_ver], func_to_find)
resp = handler(req_json)
except AttributeError as e:
except AttributeError as e:
self.logger.warning(f"Unhandled {version_string} request {req_url} - {e}")
resp = {}
except Exception as e:
self.logger.error(f"Error handling {version_string} method {req_url} - {e}")
raise
self.logger.debug(f"{version_string} Response {resp}")
self.logger.debug(f"{version_string} Response {resp}")
return json.dumps(resp, ensure_ascii=False).encode("utf-8")

View File

@@ -8,13 +8,23 @@ from core.config import CoreConfig
from titles.cxb.database import CxbData
from titles.cxb.const import CxbConstants
class CxbReader(BaseReader):
def __init__(self, config: CoreConfig, version: int, bin_arg: Optional[str], opt_arg: Optional[str], extra: Optional[str]) -> None:
def __init__(
self,
config: CoreConfig,
version: int,
bin_arg: Optional[str],
opt_arg: Optional[str],
extra: Optional[str],
) -> None:
super().__init__(config, version, bin_arg, opt_arg, extra)
self.data = CxbData(config)
try:
self.logger.info(f"Start importer for {CxbConstants.game_ver_to_string(version)}")
self.logger.info(
f"Start importer for {CxbConstants.game_ver_to_string(version)}"
)
except IndexError:
self.logger.error(f"Invalid project cxb version {version}")
exit(1)
@@ -28,7 +38,7 @@ class CxbReader(BaseReader):
if pull_bin_ram:
self.read_csv(f"{self.bin_dir}")
def read_csv(self, bin_dir: str) -> None:
self.logger.info(f"Read csv from {bin_dir}")
@@ -45,18 +55,73 @@ class CxbReader(BaseReader):
if not "N/A" in row["standard"]:
self.logger.info(f"Added song {song_id} chart 0")
self.data.static.put_music(self.version, song_id, index, 0, title, artist, genre, int(row["standard"].replace("Standard ","").replace("N/A","0")))
self.data.static.put_music(
self.version,
song_id,
index,
0,
title,
artist,
genre,
int(
row["standard"]
.replace("Standard ", "")
.replace("N/A", "0")
),
)
if not "N/A" in row["hard"]:
self.logger.info(f"Added song {song_id} chart 1")
self.data.static.put_music(self.version, song_id, index, 1, title, artist, genre, int(row["hard"].replace("Hard ","").replace("N/A","0")))
self.data.static.put_music(
self.version,
song_id,
index,
1,
title,
artist,
genre,
int(row["hard"].replace("Hard ", "").replace("N/A", "0")),
)
if not "N/A" in row["master"]:
self.logger.info(f"Added song {song_id} chart 2")
self.data.static.put_music(self.version, song_id, index, 2, title, artist, genre, int(row["master"].replace("Master ","").replace("N/A","0")))
self.data.static.put_music(
self.version,
song_id,
index,
2,
title,
artist,
genre,
int(
row["master"].replace("Master ", "").replace("N/A", "0")
),
)
if not "N/A" in row["unlimited"]:
self.logger.info(f"Added song {song_id} chart 3")
self.data.static.put_music(self.version, song_id, index, 3, title, artist, genre, int(row["unlimited"].replace("Unlimited ","").replace("N/A","0")))
self.data.static.put_music(
self.version,
song_id,
index,
3,
title,
artist,
genre,
int(
row["unlimited"]
.replace("Unlimited ", "")
.replace("N/A", "0")
),
)
if not "N/A" in row["easy"]:
self.logger.info(f"Added song {song_id} chart 4")
self.data.static.put_music(self.version, song_id, index, 4, title, artist, genre, int(row["easy"].replace("Easy ","").replace("N/A","0")))
self.data.static.put_music(
self.version,
song_id,
index,
4,
title,
artist,
genre,
int(row["easy"].replace("Easy ", "").replace("N/A", "0")),
)
except:
self.logger.warn(f"Couldn't read csv file in {self.bin_dir}, skipping")

View File

@@ -11,155 +11,191 @@ from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
class CxbRev(CxbBase):
def __init__(self, cfg: CoreConfig, game_cfg: CxbConfig) -> None:
super().__init__(cfg, game_cfg)
self.version = CxbConstants.VER_CROSSBEATS_REV
def handle_data_path_list_request(self, data: Dict) -> Dict:
return { "data": "" }
return {"data": ""}
def handle_data_putlog_request(self, data: Dict) -> Dict:
if data["putlog"]["type"] == "ResultLog":
score_data = json.loads(data["putlog"]["data"])
userid = score_data['usid']
userid = score_data["usid"]
self.data.score.put_playlog(userid, score_data['mcode'], score_data['difficulty'], score_data["score"], int(Decimal(score_data["clearrate"]) * 100), score_data["flawless"], score_data["super"], score_data["cool"], score_data["fast"], score_data["fast2"], score_data["slow"], score_data["slow2"], score_data["fail"], score_data["combo"])
return({"data":True})
return {"data": True }
self.data.score.put_playlog(
userid,
score_data["mcode"],
score_data["difficulty"],
score_data["score"],
int(Decimal(score_data["clearrate"]) * 100),
score_data["flawless"],
score_data["super"],
score_data["cool"],
score_data["fast"],
score_data["fast2"],
score_data["slow"],
score_data["slow2"],
score_data["fail"],
score_data["combo"],
)
return {"data": True}
return {"data": True}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/MusicArchiveList.csv") as music:
lines = music.readlines()
for line in lines:
line_split = line.split(',')
line_split = line.split(",")
ret_str += f"{line_split[0]},{line_split[1]},{line_split[2]},{line_split[3]},{line_split[4]},{line_split[5]},{line_split[6]},{line_split[7]},{line_split[8]},{line_split[9]},{line_split[10]},{line_split[11]},{line_split[12]},{line_split[13]},{line_split[14]},\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_item_list_icon_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListIcon\r\n"
with open(r"titles/cxb/rev_data/Item/ItemArchiveList_Icon.csv", encoding="utf-8") as item:
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_Icon.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_item_list_skin_notes_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListSkinNotes\r\n"
with open(r"titles/cxb/rev_data/Item/ItemArchiveList_SkinNotes.csv", encoding="utf-8") as item:
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_SkinNotes.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_item_list_skin_effect_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListSkinEffect\r\n"
with open(r"titles/cxb/rev_data/Item/ItemArchiveList_SkinEffect.csv", encoding="utf-8") as item:
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_SkinEffect.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_item_list_skin_bg_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListSkinBg\r\n"
with open(r"titles/cxb/rev_data/Item/ItemArchiveList_SkinBg.csv", encoding="utf-8") as item:
with open(
r"titles/cxb/rev_data/Item/ItemArchiveList_SkinBg.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_item_list_title_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ItemListTitle\r\n"
with open(r"titles/cxb/rev_data/Item/ItemList_Title.csv", encoding="shift-jis") as item:
with open(
r"titles/cxb/rev_data/Item/ItemList_Title.csv", encoding="shift-jis"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_shop_list_music_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListMusic\r\n"
with open(r"titles/cxb/rev_data/Shop/ShopList_Music.csv", encoding="shift-jis") as shop:
with open(
r"titles/cxb/rev_data/Shop/ShopList_Music.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_shop_list_icon_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListIcon\r\n"
with open(r"titles/cxb/rev_data/Shop/ShopList_Icon.csv", encoding="shift-jis") as shop:
with open(
r"titles/cxb/rev_data/Shop/ShopList_Icon.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_shop_list_title_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListTitle\r\n"
with open(r"titles/cxb/rev_data/Shop/ShopList_Title.csv", encoding="shift-jis") as shop:
with open(
r"titles/cxb/rev_data/Shop/ShopList_Title.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
def handle_data_shop_list_skin_hud_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_shop_list_skin_arrow_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_shop_list_skin_hit_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_shop_list_skin_hud_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_shop_list_skin_arrow_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_shop_list_skin_hit_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_shop_list_sale_request(self, data: Dict) -> Dict:
ret_str = "\r\n#ShopListSale\r\n"
with open(r"titles/cxb/rev_data/Shop/ShopList_Sale.csv", encoding="shift-jis") as shop:
with open(
r"titles/cxb/rev_data/Shop/ShopList_Sale.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_extra_stage_list_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_exxxxx_request(self, data: Dict) -> Dict:
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(fr"titles/cxb/rev_data/Ex000{extra_num}.csv", encoding="shift-jis") as stage:
with open(
rf"titles/cxb/rev_data/Ex000{extra_num}.csv", encoding="shift-jis"
) as stage:
lines = stage.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
def handle_data_bonus_list10100_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_free_coupon_request(self, data: Dict) -> Dict:
return({"data": ""})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_bonus_list10100_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_free_coupon_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_news_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/NewsList.csv", encoding="UTF-8") as news:
lines = news.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_tips_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
@cached(lifetime=86400)
def handle_data_license_request(self, data: Dict) -> Dict:
ret_str = ""
@@ -167,90 +203,104 @@ class CxbRev(CxbBase):
lines = lic.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_course_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/Course/CourseList.csv", encoding="UTF-8") as course:
with open(
r"titles/cxb/rev_data/Course/CourseList.csv", encoding="UTF-8"
) as course:
lines = course.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_csxxxx_request(self, data: Dict) -> Dict:
# Removed the CSVs since the format isnt quite right
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(fr"titles/cxb/rev_data/Course/Cs000{extra_num}.csv", encoding="shift-jis") as course:
with open(
rf"titles/cxb/rev_data/Course/Cs000{extra_num}.csv", encoding="shift-jis"
) as course:
lines = course.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_mission_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/MissionList.csv", encoding="shift-jis") as mission:
with open(
r"titles/cxb/rev_data/MissionList.csv", encoding="shift-jis"
) as mission:
lines = mission.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
def handle_data_mission_bonus_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_unlimited_mission_request(self, data: Dict) -> Dict:
return({"data": ""})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_mission_bonus_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_unlimited_mission_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_event_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/Event/EventArchiveList.csv", encoding="shift-jis") as mission:
with open(
r"titles/cxb/rev_data/Event/EventArchiveList.csv", encoding="shift-jis"
) as mission:
lines = mission.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_event_music_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_mission_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_achievement_single_high_score_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_achievement_single_accumulation_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_ranking_high_score_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_ranking_accumulation_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_ranking_stamp_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_ranking_store_list_request(self, data: Dict) -> Dict:
return({"data": ""})
def handle_data_event_ranking_area_list_request(self, data: Dict) -> Dict:
return({"data": ""})
return {"data": ""}
@cached(lifetime=86400)
def handle_data_event_mission_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_event_achievement_single_high_score_list_request(
self, data: Dict
) -> Dict:
return {"data": ""}
def handle_data_event_achievement_single_accumulation_request(
self, data: Dict
) -> Dict:
return {"data": ""}
def handle_data_event_ranking_high_score_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_event_ranking_accumulation_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_event_ranking_stamp_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_event_ranking_store_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_event_ranking_area_list_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_event_stamp_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rev_data/Event/EventStampList.csv", encoding="shift-jis") as event:
with open(
r"titles/cxb/rev_data/Event/EventStampList.csv", encoding="shift-jis"
) as event:
lines = event.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_event_stamp_map_list_csxxxx_request(self, data: Dict) -> Dict:
return({"data": "1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"})
return {"data": "1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"}
def handle_data_server_state_request(self, data: Dict) -> Dict:
return({"data": True})
return {"data": True}

View File

@@ -11,128 +11,147 @@ from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
class CxbRevSunriseS1(CxbBase):
def __init__(self, cfg: CoreConfig, game_cfg: CxbConfig) -> None:
super().__init__(cfg, game_cfg)
self.version = CxbConstants.VER_CROSSBEATS_REV_SUNRISE_S1
def handle_data_path_list_request(self, data: Dict) -> Dict:
return { "data": "" }
@cached(lifetime=86400)
def handle_data_path_list_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/MusicArchiveList.csv") as music:
lines = music.readlines()
for line in lines:
line_split = line.split(',')
line_split = line.split(",")
ret_str += f"{line_split[0]},{line_split[1]},{line_split[2]},{line_split[3]},{line_split[4]},{line_split[5]},{line_split[6]},{line_split[7]},{line_split[8]},{line_split[9]},{line_split[10]},{line_split[11]},{line_split[12]},{line_split[13]},{line_split[14]},\r\n"
return({"data":ret_str})
@cached(lifetime=86400)
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_item_list_detail_request(self, data: Dict) -> Dict:
#ItemListIcon load
# ItemListIcon load
ret_str = "#ItemListIcon\r\n"
with open(r"titles/cxb/rss1_data/Item/ItemList_Icon.csv", encoding="shift-jis") as item:
with open(
r"titles/cxb/rss1_data/Item/ItemList_Icon.csv", encoding="shift-jis"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ItemListTitle load
# ItemListTitle load
ret_str += "\r\n#ItemListTitle\r\n"
with open(r"titles/cxb/rss1_data/Item/ItemList_Title.csv", encoding="shift-jis") as item:
with open(
r"titles/cxb/rss1_data/Item/ItemList_Title.csv", encoding="shift-jis"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_shop_list_detail_request(self, data: Dict) -> Dict:
#ShopListIcon load
# ShopListIcon load
ret_str = "#ShopListIcon\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_Icon.csv", encoding="utf-8") as shop:
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Icon.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListMusic load
ret_str += "\r\n#ShopListMusic\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_Music.csv", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSale load
ret_str += "\r\n#ShopListSale\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_Sale.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSkinBg load
ret_str += "\r\n#ShopListSkinBg\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_SkinBg.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSkinEffect load
ret_str += "\r\n#ShopListSkinEffect\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_SkinEffect.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSkinNotes load
ret_str += "\r\n#ShopListSkinNotes\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_SkinNotes.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListTitle load
ret_str += "\r\n#ShopListTitle\r\n"
with open(r"titles/cxb/rss1_data/Shop/ShopList_Title.csv", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
def handle_data_extra_stage_list_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_ex0001_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_one_more_extra_list_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_bonus_list10100_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_oe0001_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_free_coupon_request(self, data: Dict) -> Dict:
return({"data":""})
@cached(lifetime=86400)
# ShopListMusic load
ret_str += "\r\n#ShopListMusic\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Music.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSale load
ret_str += "\r\n#ShopListSale\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Sale.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSkinBg load
ret_str += "\r\n#ShopListSkinBg\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_SkinBg.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSkinEffect load
ret_str += "\r\n#ShopListSkinEffect\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_SkinEffect.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSkinNotes load
ret_str += "\r\n#ShopListSkinNotes\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_SkinNotes.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListTitle load
ret_str += "\r\n#ShopListTitle\r\n"
with open(
r"titles/cxb/rss1_data/Shop/ShopList_Title.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return {"data": ret_str}
def handle_data_extra_stage_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_ex0001_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_one_more_extra_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_bonus_list10100_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_oe0001_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_free_coupon_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_news_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/NewsList.csv", encoding="UTF-8") as news:
lines = news.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_tips_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_release_info_list_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
@cached(lifetime=86400)
def handle_data_random_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
@@ -141,10 +160,12 @@ class CxbRevSunriseS1(CxbBase):
count = 0
for line in lines:
line_split = line.split(",")
ret_str += str(count) + "," + line_split[0] + "," + line_split[0] + ",\r\n"
ret_str += (
str(count) + "," + line_split[0] + "," + line_split[0] + ",\r\n"
)
return {"data": ret_str}
return({"data":ret_str})
@cached(lifetime=86400)
def handle_data_license_request(self, data: Dict) -> Dict:
ret_str = ""
@@ -152,54 +173,58 @@ class CxbRevSunriseS1(CxbBase):
lines = licenses.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_course_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss1_data/Course/CourseList.csv", encoding="UTF-8") as course:
with open(
r"titles/cxb/rss1_data/Course/CourseList.csv", encoding="UTF-8"
) as course:
lines = course.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_csxxxx_request(self, data: Dict) -> Dict:
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(fr"titles/cxb/rss1_data/Course/Cs{extra_num}.csv", encoding="shift-jis") as course:
with open(
rf"titles/cxb/rss1_data/Course/Cs{extra_num}.csv", encoding="shift-jis"
) as course:
lines = course.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_mission_list_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_mission_bonus_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_unlimited_mission_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_partner_list_request(self, data: Dict) -> Dict:
ret_str = ""
# Lord forgive me for the sins I am about to commit
for i in range(0,10):
for i in range(0, 10):
ret_str += f"80000{i},{i},{i},0,10000,,\r\n"
ret_str += f"80000{i},{i},{i},1,10500,,\r\n"
ret_str += f"80000{i},{i},{i},2,10500,,\r\n"
for i in range(10,13):
for i in range(10, 13):
ret_str += f"8000{i},{i},{i},0,10000,,\r\n"
ret_str += f"8000{i},{i},{i},1,10500,,\r\n"
ret_str += f"8000{i},{i},{i},2,10500,,\r\n"
ret_str +="\r\n---\r\n0,150,100,100,100,100,\r\n"
for i in range(1,130):
ret_str +=f"{i},100,100,100,100,100,\r\n"
ret_str += "\r\n---\r\n0,150,100,100,100,100,\r\n"
for i in range(1, 130):
ret_str += f"{i},100,100,100,100,100,\r\n"
ret_str += "---\r\n"
return({"data": ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_partnerxxxx_request(self, data: Dict) -> Dict:
partner_num = int(data["dldate"]["filetype"][-4:])
@@ -208,50 +233,54 @@ class CxbRevSunriseS1(CxbBase):
lines = partner.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data": ret_str})
return {"data": ret_str}
def handle_data_server_state_request(self, data: Dict) -> Dict:
return({"data": True})
return {"data": True}
def handle_data_settings_request(self, data: Dict) -> Dict:
return({"data": "2,\r\n"})
return {"data": "2,\r\n"}
def handle_data_story_list_request(self, data: Dict) -> Dict:
#story id, story name, game version, start time, end time, course arc, unlock flag, song mcode for menu
# story id, story name, game version, start time, end time, course arc, unlock flag, song mcode for menu
ret_str = "\r\n"
ret_str += f"st0000,RISING PURPLE,10104,1464370990,4096483201,Cs1000,-1,purple,\r\n"
ret_str += f"st0001,REBEL YELL,10104,1467999790,4096483201,Cs1000,-1,chaset,\r\n"
ret_str += (
f"st0000,RISING PURPLE,10104,1464370990,4096483201,Cs1000,-1,purple,\r\n"
)
ret_str += (
f"st0001,REBEL YELL,10104,1467999790,4096483201,Cs1000,-1,chaset,\r\n"
)
ret_str += f"st0002,REMNANT,10104,1502127790,4096483201,Cs1000,-1,overcl,\r\n"
return({"data": ret_str})
return {"data": ret_str}
def handle_data_stxxxx_request(self, data: Dict) -> Dict:
story_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
for i in range(1,11):
ret_str +=f"{i},st000{story_num}_{i-1},,,,,,,,,,,,,,,,1,,-1,1,\r\n"
return({"data": ret_str})
for i in range(1, 11):
ret_str += f"{i},st000{story_num}_{i-1},,,,,,,,,,,,,,,,1,,-1,1,\r\n"
return {"data": ret_str}
def handle_data_event_stamp_list_request(self, data: Dict) -> Dict:
return({"data":"Cs1032,1,1,1,1,1,1,1,1,1,1,\r\n"})
return {"data": "Cs1032,1,1,1,1,1,1,1,1,1,1,\r\n"}
def handle_data_premium_list_request(self, data: Dict) -> Dict:
return({"data": "1,,,,10,,,,,99,,,,,,,,,100,,\r\n"})
return {"data": "1,,,,10,,,,,99,,,,,,,,,100,,\r\n"}
def handle_data_event_list_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_event_detail_list_request(self, data: Dict) -> Dict:
event_id = data["dldate"]["filetype"].split("/")[2]
if "EventStampMapListCs1002" in event_id:
return({"data":"1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"})
return {"data": "1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"}
elif "EventStampList" in event_id:
return({"data":"Cs1002,1,1,1,1,1,1,1,1,1,1,\r\n"})
return {"data": "Cs1002,1,1,1,1,1,1,1,1,1,1,\r\n"}
else:
return({"data":""})
return {"data": ""}
def handle_data_event_stamp_map_list_csxxxx_request(self, data: Dict) -> Dict:
event_id = data["dldate"]["filetype"].split("/")[2]
if "EventStampMapListCs1002" in event_id:
return({"data":"1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"})
return {"data": "1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"}
else:
return({"data":""})
return {"data": ""}

View File

@@ -11,128 +11,147 @@ from titles.cxb.config import CxbConfig
from titles.cxb.base import CxbBase
from titles.cxb.const import CxbConstants
class CxbRevSunriseS2(CxbBase):
def __init__(self, cfg: CoreConfig, game_cfg: CxbConfig) -> None:
super().__init__(cfg, game_cfg)
self.version = CxbConstants.VER_CROSSBEATS_REV_SUNRISE_S2_OMNI
def handle_data_path_list_request(self, data: Dict) -> Dict:
return { "data": "" }
@cached(lifetime=86400)
def handle_data_path_list_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/MusicArchiveList.csv") as music:
lines = music.readlines()
for line in lines:
line_split = line.split(',')
line_split = line.split(",")
ret_str += f"{line_split[0]},{line_split[1]},{line_split[2]},{line_split[3]},{line_split[4]},{line_split[5]},{line_split[6]},{line_split[7]},{line_split[8]},{line_split[9]},{line_split[10]},{line_split[11]},{line_split[12]},{line_split[13]},{line_split[14]},\r\n"
return({"data":ret_str})
@cached(lifetime=86400)
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_item_list_detail_request(self, data: Dict) -> Dict:
#ItemListIcon load
# ItemListIcon load
ret_str = "#ItemListIcon\r\n"
with open(r"titles/cxb/rss2_data/Item/ItemList_Icon.csv", encoding="utf-8") as item:
with open(
r"titles/cxb/rss2_data/Item/ItemList_Icon.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ItemListTitle load
# ItemListTitle load
ret_str += "\r\n#ItemListTitle\r\n"
with open(r"titles/cxb/rss2_data/Item/ItemList_Title.csv", encoding="utf-8") as item:
with open(
r"titles/cxb/rss2_data/Item/ItemList_Title.csv", encoding="utf-8"
) as item:
lines = item.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
@cached(lifetime=86400)
def handle_data_shop_list_detail_request(self, data: Dict) -> Dict:
#ShopListIcon load
# ShopListIcon load
ret_str = "#ShopListIcon\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_Icon.csv", encoding="utf-8") as shop:
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Icon.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListMusic load
ret_str += "\r\n#ShopListMusic\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_Music.csv", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSale load
ret_str += "\r\n#ShopListSale\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_Sale.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSkinBg load
ret_str += "\r\n#ShopListSkinBg\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_SkinBg.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSkinEffect load
ret_str += "\r\n#ShopListSkinEffect\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_SkinEffect.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListSkinNotes load
ret_str += "\r\n#ShopListSkinNotes\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_SkinNotes.csv", encoding="shift-jis") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
#ShopListTitle load
ret_str += "\r\n#ShopListTitle\r\n"
with open(r"titles/cxb/rss2_data/Shop/ShopList_Title.csv", encoding="utf-8") as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
def handle_data_extra_stage_list_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_ex0001_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_one_more_extra_list_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_bonus_list10100_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_oe0001_request(self, data: Dict) -> Dict:
return({"data":""})
def handle_data_free_coupon_request(self, data: Dict) -> Dict:
return({"data":""})
@cached(lifetime=86400)
# ShopListMusic load
ret_str += "\r\n#ShopListMusic\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Music.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSale load
ret_str += "\r\n#ShopListSale\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Sale.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSkinBg load
ret_str += "\r\n#ShopListSkinBg\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_SkinBg.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSkinEffect load
ret_str += "\r\n#ShopListSkinEffect\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_SkinEffect.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListSkinNotes load
ret_str += "\r\n#ShopListSkinNotes\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_SkinNotes.csv", encoding="shift-jis"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
# ShopListTitle load
ret_str += "\r\n#ShopListTitle\r\n"
with open(
r"titles/cxb/rss2_data/Shop/ShopList_Title.csv", encoding="utf-8"
) as shop:
lines = shop.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return {"data": ret_str}
def handle_data_extra_stage_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_ex0001_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_one_more_extra_list_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_bonus_list10100_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_oe0001_request(self, data: Dict) -> Dict:
return {"data": ""}
def handle_data_free_coupon_request(self, data: Dict) -> Dict:
return {"data": ""}
@cached(lifetime=86400)
def handle_data_news_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/NewsList.csv", encoding="UTF-8") as news:
lines = news.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_tips_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_release_info_list_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
@cached(lifetime=86400)
def handle_data_random_music_list_request(self, data: Dict) -> Dict:
ret_str = ""
@@ -141,10 +160,12 @@ class CxbRevSunriseS2(CxbBase):
count = 0
for line in lines:
line_split = line.split(",")
ret_str += str(count) + "," + line_split[0] + "," + line_split[0] + ",\r\n"
ret_str += (
str(count) + "," + line_split[0] + "," + line_split[0] + ",\r\n"
)
return {"data": ret_str}
return({"data":ret_str})
@cached(lifetime=86400)
def handle_data_license_request(self, data: Dict) -> Dict:
ret_str = ""
@@ -152,54 +173,58 @@ class CxbRevSunriseS2(CxbBase):
lines = licenses.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_course_list_request(self, data: Dict) -> Dict:
ret_str = ""
with open(r"titles/cxb/rss2_data/Course/CourseList.csv", encoding="UTF-8") as course:
with open(
r"titles/cxb/rss2_data/Course/CourseList.csv", encoding="UTF-8"
) as course:
lines = course.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_csxxxx_request(self, data: Dict) -> Dict:
extra_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
with open(fr"titles/cxb/rss2_data/Course/Cs{extra_num}.csv", encoding="shift-jis") as course:
with open(
rf"titles/cxb/rss2_data/Course/Cs{extra_num}.csv", encoding="shift-jis"
) as course:
lines = course.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data":ret_str})
return {"data": ret_str}
def handle_data_mission_list_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_mission_bonus_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_unlimited_mission_request(self, data: Dict) -> Dict:
return({"data":""})
return {"data": ""}
def handle_data_partner_list_request(self, data: Dict) -> Dict:
ret_str = ""
# Lord forgive me for the sins I am about to commit
for i in range(0,10):
for i in range(0, 10):
ret_str += f"80000{i},{i},{i},0,10000,,\r\n"
ret_str += f"80000{i},{i},{i},1,10500,,\r\n"
ret_str += f"80000{i},{i},{i},2,10500,,\r\n"
for i in range(10,13):
for i in range(10, 13):
ret_str += f"8000{i},{i},{i},0,10000,,\r\n"
ret_str += f"8000{i},{i},{i},1,10500,,\r\n"
ret_str += f"8000{i},{i},{i},2,10500,,\r\n"
ret_str +="\r\n---\r\n0,150,100,100,100,100,\r\n"
for i in range(1,130):
ret_str +=f"{i},100,100,100,100,100,\r\n"
ret_str += "\r\n---\r\n0,150,100,100,100,100,\r\n"
for i in range(1, 130):
ret_str += f"{i},100,100,100,100,100,\r\n"
ret_str += "---\r\n"
return({"data": ret_str})
return {"data": ret_str}
@cached(lifetime=86400)
def handle_data_partnerxxxx_request(self, data: Dict) -> Dict:
partner_num = int(data["dldate"]["filetype"][-4:])
@@ -208,55 +233,65 @@ class CxbRevSunriseS2(CxbBase):
lines = partner.readlines()
for line in lines:
ret_str += f"{line[:-1]}\r\n"
return({"data": ret_str})
return {"data": ret_str}
def handle_data_server_state_request(self, data: Dict) -> Dict:
return({"data": True})
return {"data": True}
def handle_data_settings_request(self, data: Dict) -> Dict:
return({"data": "2,\r\n"})
return {"data": "2,\r\n"}
def handle_data_story_list_request(self, data: Dict) -> Dict:
#story id, story name, game version, start time, end time, course arc, unlock flag, song mcode for menu
# story id, story name, game version, start time, end time, course arc, unlock flag, song mcode for menu
ret_str = "\r\n"
ret_str += f"st0000,RISING PURPLE,10104,1464370990,4096483201,Cs1000,-1,purple,\r\n"
ret_str += f"st0001,REBEL YELL,10104,1467999790,4096483201,Cs1000,-1,chaset,\r\n"
ret_str += (
f"st0000,RISING PURPLE,10104,1464370990,4096483201,Cs1000,-1,purple,\r\n"
)
ret_str += (
f"st0001,REBEL YELL,10104,1467999790,4096483201,Cs1000,-1,chaset,\r\n"
)
ret_str += f"st0002,REMNANT,10104,1502127790,4096483201,Cs1000,-1,overcl,\r\n"
return({"data": ret_str})
return {"data": ret_str}
def handle_data_stxxxx_request(self, data: Dict) -> Dict:
story_num = int(data["dldate"]["filetype"][-4:])
ret_str = ""
# Each stories appears to have 10 pieces based on the wiki but as on how they are set.... no clue
for i in range(1,11):
ret_str +=f"{i},st000{story_num}_{i-1},,,,,,,,,,,,,,,,1,,-1,1,\r\n"
return({"data": ret_str})
for i in range(1, 11):
ret_str += f"{i},st000{story_num}_{i-1},,,,,,,,,,,,,,,,1,,-1,1,\r\n"
return {"data": ret_str}
def handle_data_event_stamp_list_request(self, data: Dict) -> Dict:
return({"data":"Cs1002,1,1,1,1,1,1,1,1,1,1,\r\n"})
return {"data": "Cs1002,1,1,1,1,1,1,1,1,1,1,\r\n"}
def handle_data_premium_list_request(self, data: Dict) -> Dict:
return({"data": "1,,,,10,,,,,99,,,,,,,,,100,,\r\n"})
return {"data": "1,,,,10,,,,,99,,,,,,,,,100,,\r\n"}
def handle_data_event_list_request(self, data: Dict) -> Dict:
return({"data":"Cs4001,0,10000,1601510400,1604188799,1,nv2006,1,\r\nCs4005,0,10000,1609459200,1615766399,1,nv2006,1,\r\n"})
return {
"data": "Cs4001,0,10000,1601510400,1604188799,1,nv2006,1,\r\nCs4005,0,10000,1609459200,1615766399,1,nv2006,1,\r\n"
}
def handle_data_event_detail_list_request(self, data: Dict) -> Dict:
event_id = data["dldate"]["filetype"].split("/")[2]
if "Cs4001" in event_id:
return({"data":"#EventMusicList\r\n1,zonzon2,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n2,moonki,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n3,tricko,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n"})
return {
"data": "#EventMusicList\r\n1,zonzon2,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n2,moonki,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n3,tricko,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n"
}
elif "Cs4005" in event_id:
return({"data":"#EventMusicList\r\n2,firstl,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n2,valent,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n2,dazzli2,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n"})
return {
"data": "#EventMusicList\r\n2,firstl,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n2,valent,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n2,dazzli2,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,\r\n"
}
elif "EventStampMapListCs1002" in event_id:
return({"data":"1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"})
return {"data": "1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"}
elif "EventStampList" in event_id:
return({"data":"Cs1002,1,1,1,1,1,1,1,1,1,1,\r\n"})
return {"data": "Cs1002,1,1,1,1,1,1,1,1,1,1,\r\n"}
else:
return({"data":""})
return {"data": ""}
def handle_data_event_stamp_map_list_csxxxx_request(self, data: Dict) -> Dict:
event_id = data["dldate"]["filetype"].split("/")[2]
if "EventStampMapListCs1002" in event_id:
return({"data":"1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"})
return {"data": "1,2,1,1,2,3,9,5,6,7,8,9,10,\r\n"}
else:
return({"data":""})
return {"data": ""}

View File

@@ -14,32 +14,29 @@ energy = Table(
Column("user", ForeignKey("aime_user.id", ondelete="cascade"), nullable=False),
Column("energy", Integer, nullable=False, server_default="0"),
UniqueConstraint("user", name="cxb_rev_energy_uk"),
mysql_charset='utf8mb4'
mysql_charset="utf8mb4",
)
class CxbItemData(BaseData):
def put_energy(self, user_id: int, rev_energy: int) -> Optional[int]:
sql = insert(energy).values(
user = user_id,
energy = rev_energy
)
conflict = sql.on_duplicate_key_update(
energy = rev_energy
)
class CxbItemData(BaseData):
def put_energy(self, user_id: int, rev_energy: int) -> Optional[int]:
sql = insert(energy).values(user=user_id, energy=rev_energy)
conflict = sql.on_duplicate_key_update(energy=rev_energy)
result = self.execute(conflict)
if result is None:
self.logger.error(f"{__name__} failed to insert item! user: {user_id}, energy: {rev_energy}")
self.logger.error(
f"{__name__} failed to insert item! user: {user_id}, energy: {rev_energy}"
)
return None
return result.lastrowid
def get_energy(self, user_id: int) -> Optional[Dict]:
sql = energy.select(
and_(energy.c.user == user_id)
)
sql = energy.select(and_(energy.c.user == user_id))
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchone()

View File

@@ -16,57 +16,63 @@ profile = Table(
Column("index", Integer, nullable=False),
Column("data", JSON, nullable=False),
UniqueConstraint("user", "index", name="cxb_profile_uk"),
mysql_charset='utf8mb4'
mysql_charset="utf8mb4",
)
class CxbProfileData(BaseData):
def put_profile(self, user_id: int, version: int, index: int, data: JSON) -> Optional[int]:
def put_profile(
self, user_id: int, version: int, index: int, data: JSON
) -> Optional[int]:
sql = insert(profile).values(
user = user_id,
version = version,
index = index,
data = data
user=user_id, version=version, index=index, data=data
)
conflict = sql.on_duplicate_key_update(
index = index,
data = data
)
conflict = sql.on_duplicate_key_update(index=index, data=data)
result = self.execute(conflict)
if result is None:
self.logger.error(f"{__name__} failed to update! user: {user_id}, index: {index}, data: {data}")
self.logger.error(
f"{__name__} failed to update! user: {user_id}, index: {index}, data: {data}"
)
return None
return result.lastrowid
def get_profile(self, aime_id: int, version: int) -> Optional[List[Dict]]:
"""
Given a game version and either a profile or aime id, return the profile
"""
sql = profile.select(and_(
profile.c.version == version,
profile.c.user == aime_id
))
sql = profile.select(
and_(profile.c.version == version, profile.c.user == aime_id)
)
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchall()
def get_profile_index(self, index: int, aime_id: int = None, version: int = None) -> Optional[Dict]:
def get_profile_index(
self, index: int, aime_id: int = None, version: int = None
) -> Optional[Dict]:
"""
Given a game version and either a profile or aime id, return the profile
"""
if aime_id is not None and version is not None and index is not None:
sql = profile.select(and_(
sql = profile.select(
and_(
profile.c.version == version,
profile.c.user == aime_id,
profile.c.index == index
))
profile.c.index == index,
)
)
else:
self.logger.error(f"get_profile: Bad arguments!! aime_id {aime_id} version {version}")
self.logger.error(
f"get_profile: Bad arguments!! aime_id {aime_id} version {version}"
)
return None
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchone()

View File

@@ -18,7 +18,7 @@ score = Table(
Column("song_index", Integer),
Column("data", JSON),
UniqueConstraint("user", "song_mcode", "song_index", name="cxb_score_uk"),
mysql_charset='utf8mb4'
mysql_charset="utf8mb4",
)
playlog = Table(
@@ -40,7 +40,7 @@ playlog = Table(
Column("fail", Integer),
Column("combo", Integer),
Column("date_scored", TIMESTAMP, server_default=func.now()),
mysql_charset='utf8mb4'
mysql_charset="utf8mb4",
)
ranking = Table(
@@ -53,11 +53,19 @@ ranking = Table(
Column("score", Integer),
Column("clear", Integer),
UniqueConstraint("user", "rev_id", name="cxb_ranking_uk"),
mysql_charset='utf8mb4'
mysql_charset="utf8mb4",
)
class CxbScoreData(BaseData):
def put_best_score(self, user_id: int, song_mcode: str, game_version: int, song_index: int, data: JSON) -> Optional[int]:
def put_best_score(
self,
user_id: int,
song_mcode: str,
game_version: int,
song_index: int,
data: JSON,
) -> Optional[int]:
"""
Update the user's best score for a chart
"""
@@ -66,22 +74,37 @@ class CxbScoreData(BaseData):
song_mcode=song_mcode,
game_version=game_version,
song_index=song_index,
data=data
data=data,
)
conflict = sql.on_duplicate_key_update(
data = sql.inserted.data
)
conflict = sql.on_duplicate_key_update(data=sql.inserted.data)
result = self.execute(conflict)
if result is None:
self.logger.error(f"{__name__} failed to insert best score! profile: {user_id}, song: {song_mcode}, data: {data}")
self.logger.error(
f"{__name__} failed to insert best score! profile: {user_id}, song: {song_mcode}, data: {data}"
)
return None
return result.lastrowid
def put_playlog(self, user_id: int, song_mcode: str, chart_id: int, score: int, clear: int, flawless: int, this_super: int,
cool: int, this_fast: int, this_fast2: int, this_slow: int, this_slow2: int, fail: int, combo: int) -> Optional[int]:
def put_playlog(
self,
user_id: int,
song_mcode: str,
chart_id: int,
score: int,
clear: int,
flawless: int,
this_super: int,
cool: int,
this_fast: int,
this_fast2: int,
this_slow: int,
this_slow2: int,
fail: int,
combo: int,
) -> Optional[int]:
"""
Add an entry to the user's play log
"""
@@ -99,45 +122,42 @@ class CxbScoreData(BaseData):
slow=this_slow,
slow2=this_slow2,
fail=fail,
combo=combo
combo=combo,
)
result = self.execute(sql)
if result is None:
self.logger.error(f"{__name__} failed to insert playlog! profile: {user_id}, song: {song_mcode}, chart: {chart_id}")
self.logger.error(
f"{__name__} failed to insert playlog! profile: {user_id}, song: {song_mcode}, chart: {chart_id}"
)
return None
return result.lastrowid
def put_ranking(self, user_id: int, rev_id: int, song_id: int, score: int, clear: int) -> Optional[int]:
def put_ranking(
self, user_id: int, rev_id: int, song_id: int, score: int, clear: int
) -> Optional[int]:
"""
Add an entry to the user's ranking logs
"""
if song_id == 0:
sql = insert(ranking).values(
user=user_id,
rev_id=rev_id,
score=score,
clear=clear
user=user_id, rev_id=rev_id, score=score, clear=clear
)
else:
sql = insert(ranking).values(
user=user_id,
rev_id=rev_id,
song_id=song_id,
score=score,
clear=clear
user=user_id, rev_id=rev_id, song_id=song_id, score=score, clear=clear
)
conflict = sql.on_duplicate_key_update(
score = score
)
conflict = sql.on_duplicate_key_update(score=score)
result = self.execute(conflict)
if result is None:
self.logger.error(f"{__name__} failed to insert ranking log! profile: {user_id}, score: {score}, clear: {clear}")
self.logger.error(
f"{__name__} failed to insert ranking log! profile: {user_id}, score: {score}, clear: {clear}"
)
return None
return result.lastrowid
def get_best_score(self, user_id: int, song_mcode: int) -> Optional[Dict]:
@@ -146,21 +166,22 @@ class CxbScoreData(BaseData):
)
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchone()
def get_best_scores(self, user_id: int) -> Optional[Dict]:
sql = score.select(score.c.user == user_id)
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchall()
def get_best_rankings(self, user_id: int) -> Optional[List[Dict]]:
sql = ranking.select(
ranking.c.user == user_id
)
sql = ranking.select(ranking.c.user == user_id)
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchall()

View File

@@ -21,54 +21,75 @@ music = Table(
Column("artist", String(255)),
Column("category", String(255)),
Column("level", Float),
UniqueConstraint("version", "songId", "chartId", "index", name="cxb_static_music_uk"),
mysql_charset='utf8mb4'
UniqueConstraint(
"version", "songId", "chartId", "index", name="cxb_static_music_uk"
),
mysql_charset="utf8mb4",
)
class CxbStaticData(BaseData):
def put_music(self, version: int, mcode: str, index: int, chart: int, title: str, artist: str, category: str, level: float ) -> Optional[int]:
def put_music(
self,
version: int,
mcode: str,
index: int,
chart: int,
title: str,
artist: str,
category: str,
level: float,
) -> Optional[int]:
sql = insert(music).values(
version = version,
songId = mcode,
index = index,
chartId = chart,
title = title,
artist = artist,
category = category,
level = level
version=version,
songId=mcode,
index=index,
chartId=chart,
title=title,
artist=artist,
category=category,
level=level,
)
conflict = sql.on_duplicate_key_update(
title = title,
artist = artist,
category = category,
level = level
title=title, artist=artist, category=category, level=level
)
result = self.execute(conflict)
if result is None: return None
if result is None:
return None
return result.lastrowid
def get_music(self, version: int, song_id: Optional[int] = None) -> Optional[List[Row]]:
def get_music(
self, version: int, song_id: Optional[int] = None
) -> Optional[List[Row]]:
if song_id is None:
sql = select(music).where(music.c.version == version)
else:
sql = select(music).where(and_(
music.c.version == version,
music.c.songId == song_id,
))
sql = select(music).where(
and_(
music.c.version == version,
music.c.songId == song_id,
)
)
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchall()
def get_music_chart(self, version: int, song_id: int, chart_id: int) -> Optional[List[Row]]:
sql = select(music).where(and_(
music.c.version == version,
music.c.songId == song_id,
music.c.chartId == chart_id
))
def get_music_chart(
self, version: int, song_id: int, chart_id: int
) -> Optional[List[Row]]:
sql = select(music).where(
and_(
music.c.version == version,
music.c.songId == song_id,
music.c.chartId == chart_id,
)
)
result = self.execute(sql)
if result is None: return None
if result is None:
return None
return result.fetchone()