Sync Develop branch's update

This commit is contained in:
SoulGateKey
2025-07-22 03:37:39 +08:00
21 changed files with 615 additions and 37 deletions

View File

@@ -7,6 +7,7 @@ import logging
import coloredlogs
import urllib.parse
import math
import random
from typing import Dict, List, Any, Optional, Union, Final
from logging.handlers import TimedRotatingFileHandler
from starlette.requests import Request
@@ -17,7 +18,10 @@ from datetime import datetime
from enum import Enum
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Signature import PKCS1_v1_5
import os
from os import path, environ, mkdir, access, W_OK
from .config import CoreConfig
@@ -132,12 +136,29 @@ class AllnetServlet:
async def handle_poweron(self, request: Request):
request_ip = Utils.get_ip_addr(request)
pragma_header = request.headers.get('Pragma', "")
useragent_header = request.headers.get('User-Agent', "")
is_dfi = pragma_header == "DFI"
is_lite = useragent_header[5:] == "Windows/Lite"
lite_id = useragent_header[:4]
data = await request.body()
if not self.config.allnet.allnet_lite_keys and is_lite:
self.logger.error("!!!LITE KEYS NOT SET!!!")
raise AllnetRequestException()
elif is_lite:
for gameids, key in self.config.allnet.allnet_lite_keys.items():
if gameids == lite_id:
litekey = key
if is_lite and "litekey" not in locals():
self.logger.error("!!!UNIQUE LITE KEY NOT FOUND!!!")
raise AllnetRequestException()
try:
if is_dfi:
req_urlencode = self.from_dfi(data)
elif is_lite:
req_urlencode = self.dec_lite(litekey, data[:16], data)
else:
req_urlencode = data
@@ -145,20 +166,30 @@ class AllnetServlet:
if req_dict is None:
raise AllnetRequestException()
req = AllnetPowerOnRequest(req_dict[0])
if is_lite:
req = AllnetPowerOnRequestLite(req_dict[0])
else:
req = AllnetPowerOnRequest(req_dict[0])
# Validate the request. Currently we only validate the fields we plan on using
if not req.game_id or not req.ver or not req.serial or not req.ip or not req.firm_ver or not req.boot_ver:
if not req.game_id or not req.ver or not req.serial or not req.token and is_lite:
raise AllnetRequestException(
f"Bad auth request params from {request_ip} - {vars(req)}"
)
elif not is_lite:
if not req.game_id or not req.ver or not req.serial or not req.ip or not req.firm_ver or not req.boot_ver:
raise AllnetRequestException(
f"Bad auth request params from {request_ip} - {vars(req)}"
)
except AllnetRequestException as e:
if e.message != "":
self.logger.error(e)
return PlainTextResponse()
if req.format_ver == 3:
if is_lite:
resp = AllnetPowerOnResponseLite(req.token)
elif req.format_ver == 3:
resp = AllnetPowerOnResponse3(req.token)
elif req.format_ver == 2:
resp = AllnetPowerOnResponse2()
@@ -175,11 +206,14 @@ class AllnetServlet:
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_machine.value
if is_lite:
resp.result = ALLNET_STAT.bad_machine.value
else:
resp.stat = ALLNET_STAT.bad_machine.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
if machine is not None:
if machine is not None and not is_lite:
arcade = await self.data.arcade.get_arcade(machine["arcade"])
if self.config.server.check_arcade_ip:
if arcade["ip"] and arcade["ip"] is not None and arcade["ip"] != req.ip:
@@ -257,7 +291,10 @@ class AllnetServlet:
)
self.logger.warning(msg)
resp.stat = ALLNET_STAT.bad_game.value
if is_lite:
resp.result = ALLNET_STAT.bad_game.value
else:
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
@@ -265,8 +302,12 @@ class AllnetServlet:
self.logger.info(
f"Allowed unknown game {req.game_id} v{req.ver} to authenticate from {request_ip} due to 'is_develop' being enabled. S/N: {req.serial}"
)
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp.host = f"{self.config.server.hostname}:{self.config.server.port}"
if is_lite:
resp.uri1 = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp.uri2 = f"{self.config.server.hostname}:{self.config.server.port}"
else:
resp.uri = f"http://{self.config.server.hostname}:{self.config.server.port}/{req.game_id}/{req.ver.replace('.', '')}/"
resp.host = f"{self.config.server.hostname}:{self.config.server.port}"
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
resp_str = urllib.parse.unquote(urllib.parse.urlencode(resp_dict))
@@ -277,10 +318,16 @@ class AllnetServlet:
int_ver = req.ver.replace(".", "")
try:
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
if is_lite:
resp.uri1, resp.uri2 = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
else:
resp.uri, resp.host = TitleServlet.title_registry[req.game_id].get_allnet_info(req.game_id, int(int_ver), req.serial)
except Exception as e:
self.logger.error(f"Error running get_allnet_info for {req.game_id} - {e}")
resp.stat = ALLNET_STAT.bad_game.value
if is_lite:
resp.result = ALLNET_STAT.bad_game.value
else:
resp.stat = ALLNET_STAT.bad_game.value
resp_dict = {k: v for k, v in vars(resp).items() if v is not None}
return PlainTextResponse(urllib.parse.unquote(urllib.parse.urlencode(resp_dict)) + "\n")
@@ -308,18 +355,38 @@ class AllnetServlet:
"Pragma": "DFI",
},
)
elif is_lite:
iv = bytes([random.randint(2, 255) for _ in range(16)])
return PlainTextResponse(content=self.enc_lite(litekey, iv, resp_str))
return PlainTextResponse(resp_str)
async def handle_dlorder(self, request: Request):
request_ip = Utils.get_ip_addr(request)
pragma_header = request.headers.get('Pragma', "")
useragent_header = request.headers.get('User-Agent', "")
is_dfi = pragma_header == "DFI"
is_lite = useragent_header[5:] == "Windows/Lite"
lite_id = useragent_header[:4]
data = await request.body()
if not self.config.allnet.allnet_lite_keys and is_lite:
self.logger.error("!!!LITE KEYS NOT SET!!!")
raise AllnetRequestException()
elif is_lite:
for gameids, key in self.config.allnet.allnet_lite_keys.items():
if gameids == lite_id:
litekey = key
if is_lite and "litekey" not in locals():
self.logger.error("!!!UNIQUE LITE KEY NOT FOUND!!!")
raise AllnetRequestException()
try:
if is_dfi:
req_urlencode = self.from_dfi(data)
elif is_lite:
req_urlencode = self.dec_lite(litekey, data[:16], data)
else:
req_urlencode = data.decode()
@@ -327,7 +394,10 @@ class AllnetServlet:
if req_dict is None:
raise AllnetRequestException()
req = AllnetDownloadOrderRequest(req_dict[0])
if is_lite:
req = AllnetDownloadOrderRequestLite(req_dict[0])
else:
req = AllnetDownloadOrderRequest(req_dict[0])
# Validate the request. Currently we only validate the fields we plan on using
if not req.game_id or not req.ver or not req.serial:
@@ -343,7 +413,11 @@ class AllnetServlet:
self.logger.info(
f"DownloadOrder from {request_ip} -> {req.game_id} v{req.ver} serial {req.serial}"
)
resp = AllnetDownloadOrderResponse(serial=req.serial)
if is_lite:
resp = AllnetDownloadOrderResponseLite()
else:
resp = AllnetDownloadOrderResponse(serial=req.serial)
if (
not self.config.allnet.allow_online_updates
@@ -354,6 +428,9 @@ class AllnetServlet:
return PlainTextResponse(
self.to_dfi(resp) + b"\r\n", headers={ "Pragma": "DFI" }
)
elif is_lite:
iv = bytes([random.randint(2, 255) for _ in range(16)])
return PlainTextResponse(content=self.enc_lite(litekey, iv, resp))
return PlainTextResponse(resp)
else:
@@ -364,6 +441,9 @@ class AllnetServlet:
return PlainTextResponse(
self.to_dfi(resp) + b"\r\n", headers={ "Pragma": "DFI" }
)
elif is_lite:
iv = bytes([random.randint(2, 255) for _ in range(16)])
return PlainTextResponse(content=self.enc_lite(litekey, iv, resp))
return PlainTextResponse(resp)
if path.exists(
@@ -393,6 +473,9 @@ class AllnetServlet:
"Pragma": "DFI",
},
)
elif is_lite:
iv = bytes([random.randint(2, 255) for _ in range(16)])
return PlainTextResponse(content=self.enc_lite(litekey, iv, res_str))
return PlainTextResponse(res_str)
@@ -517,6 +600,17 @@ class AllnetServlet:
zipped = zlib.compress(unzipped)
return base64.b64encode(zipped)
def dec_lite(self, key, iv, data):
cipher = AES.new(bytes(key), AES.MODE_CBC, iv)
decrypted = cipher.decrypt(data)
return decrypted[16:].decode("utf-8")
def enc_lite(self, key, iv, data):
unencrypted = pad(bytes([0] * 16) + data.encode('utf-8'), 16)
cipher = AES.new(bytes(key), AES.MODE_CBC, iv)
encrypted = cipher.encrypt(unencrypted)
return encrypted
class BillingServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg
@@ -773,6 +867,15 @@ class AllnetPowerOnResponse:
self.minute = datetime.now().minute
self.second = datetime.now().second
class AllnetPowerOnRequestLite:
def __init__(self, req: Dict) -> None:
if req is None:
raise AllnetRequestException("Request processing failed")
self.game_id: str = req.get("title_id", None)
self.ver: str = req.get("title_ver", None)
self.serial: str = req.get("client_id", None)
self.token: str = req.get("token", None)
class AllnetPowerOnResponse3(AllnetPowerOnResponse):
def __init__(self, token) -> None:
super().__init__()
@@ -804,6 +907,30 @@ class AllnetPowerOnResponse2(AllnetPowerOnResponse):
self.timezone = "+09:00"
self.res_class = "PowerOnResponseV2"
class AllnetPowerOnResponseLite:
def __init__(self, token) -> None:
# Custom Allnet Lite response
self.result = 1
self.place_id = "0123"
self.uri1 = ""
self.uri2 = ""
self.name = "ARTEMiS"
self.nickname = "ARTEMiS"
self.setting = "1"
self.region0 = "1"
self.region_name0 = "W"
self.region_name1 = ""
self.region_name2 = ""
self.region_name3 = ""
self.country = "CHN"
self.location_type = "1"
self.utc_time = datetime.now(tz=pytz.timezone("UTC")).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
self.client_timezone = "+0800"
self.res_ver = "3"
self.token = token
class AllnetDownloadOrderRequest:
def __init__(self, req: Dict) -> None:
self.game_id = req.get("game_id", "")
@@ -811,12 +938,23 @@ class AllnetDownloadOrderRequest:
self.serial = req.get("serial", "")
self.encode = req.get("encode", "")
class AllnetDownloadOrderRequestLite:
def __init__(self, req: Dict) -> None:
self.game_id = req.get("title_id", "")
self.ver = req.get("title_ver", "")
self.serial = req.get("client_id", "")
class AllnetDownloadOrderResponse:
def __init__(self, stat: int = 1, serial: str = "", uri: str = "null") -> None:
self.stat = stat
self.serial = serial
self.uri = uri
class AllnetDownloadOrderResponseLite:
def __init__(self, result: int = 1, uri: str = "null") -> None:
self.result = result
self.uri = uri
class TraceDataType(Enum):
CHARGE = 0
EVENT = 1
@@ -1068,7 +1206,9 @@ app_billing = Starlette(
allnet = AllnetServlet(cfg, cfg_dir)
route_lst = [
Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/net/initialize", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/net/delivery/instruction", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]),
Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]),
Route("/naomitest.html", allnet.handle_naomitest),

View File

@@ -11,6 +11,7 @@ from typing import List
from core import CoreConfig, TitleServlet, MuchaServlet
from core.allnet import AllnetServlet, BillingServlet
from core.chimedb import ChimeServlet
from core.frontend import FrontendServlet
async def dummy_rt(request: Request):
@@ -75,7 +76,9 @@ if not cfg.allnet.standalone:
allnet = AllnetServlet(cfg, cfg_dir)
route_lst += [
Route("/sys/servlet/PowerOn", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/net/initialize", allnet.handle_poweron, methods=["GET", "POST"]),
Route("/sys/servlet/DownloadOrder", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/net/delivery/instruction", allnet.handle_dlorder, methods=["GET", "POST"]),
Route("/sys/servlet/LoaderStateRecorder", allnet.handle_loaderstaterecorder, methods=["GET", "POST"]),
Route("/sys/servlet/Alive", allnet.handle_alive, methods=["GET", "POST"]),
Route("/naomitest.html", allnet.handle_naomitest),
@@ -87,6 +90,14 @@ if not cfg.allnet.standalone:
Route("/dl/ini/{file:str}", allnet.handle_dlorder_ini),
]
if cfg.chimedb.enable:
chimedb = ChimeServlet(cfg, cfg_dir)
route_lst += [
Route("/wc_aime/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
Route("/qrcode/api/alive_check", chimedb.handle_qr_alive, methods=["POST"]),
Route("/wc_aime/api/get_data", chimedb.handle_qr_lookup, methods=["POST"])
]
for code, game in title.title_registry.items():
route_lst += game.get_routes()

139
core/chimedb.py Normal file
View File

@@ -0,0 +1,139 @@
import hashlib
import json
import logging
from enum import Enum
from logging.handlers import TimedRotatingFileHandler
import coloredlogs
from starlette.responses import PlainTextResponse
from starlette.requests import Request
from core.config import CoreConfig
from core.data import Data
class ChimeDBStatus(Enum):
NONE = 0
READER_SETUP_FAIL = 1
READER_ACCESS_FAIL = 2
READER_INCOMPATIBLE = 3
DB_RESOLVE_FAIL = 4
DB_ACCESS_TIMEOUT = 5
DB_ACCESS_FAIL = 6
AIME_ID_INVALID = 7
NO_BOARD_INFO = 8
LOCK_BAN_SYSTEM_USER = 9
LOCK_BAN_SYSTEM = 10
LOCK_BAN_USER = 11
LOCK_BAN = 12
LOCK_SYSTEM_USER = 13
LOCK_SYSTEM = 14
LOCK_USER = 15
class ChimeServlet:
def __init__(self, core_cfg: CoreConfig, cfg_folder: str) -> None:
self.config = core_cfg
self.config_folder = cfg_folder
self.data = Data(core_cfg)
self.logger = logging.getLogger("chimedb")
if not hasattr(self.logger, "initted"):
log_fmt_str = "[%(asctime)s] Chimedb | %(levelname)s | %(message)s"
log_fmt = logging.Formatter(log_fmt_str)
fileHandler = TimedRotatingFileHandler(
"{0}/{1}.log".format(self.config.server.log_dir, "chimedb"),
when="d",
backupCount=10,
)
fileHandler.setFormatter(log_fmt)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(log_fmt)
self.logger.addHandler(fileHandler)
self.logger.addHandler(consoleHandler)
self.logger.setLevel(self.config.aimedb.loglevel)
coloredlogs.install(
level=core_cfg.aimedb.loglevel, logger=self.logger, fmt=log_fmt_str
)
self.logger.initted = True
if not core_cfg.chimedb.key:
self.logger.error("!!!KEY NOT SET!!!")
exit(1)
self.logger.info("Serving")
async def handle_qr_alive(self, request: Request):
return PlainTextResponse("alive")
async def handle_qr_lookup(self, request: Request) -> bytes:
req = json.loads(await request.body())
access_code = req["qrCode"][-20:]
timestamp = req["timestamp"]
try:
userId = await self._lookup(access_code)
data = json.dumps({
"userID": userId,
"errorID": 0,
"timestamp": timestamp,
"key": self._hash_key(userId, timestamp)
})
except Exception as e:
self.logger.error(e.with_traceback(None))
data = json.dumps({
"userID": -1,
"errorID": ChimeDBStatus.DB_ACCESS_FAIL,
"timestamp": timestamp,
"key": self._hash_key(-1, timestamp)
})
return PlainTextResponse(data)
def _hash_key(self, chip_id, timestamp):
input_string = f"{chip_id}{timestamp}{self.config.chimedb.key}"
hash_object = hashlib.sha256(input_string.encode('utf-8'))
hex_dig = hash_object.hexdigest()
formatted_hex = format(int(hex_dig, 16), '064x').upper()
return formatted_hex
async def _lookup(self, access_code):
user_id = await self.data.card.get_user_id_from_card(access_code)
self.logger.info(f"access_code {access_code} -> user_id {user_id}")
if not user_id or user_id <= 0:
user_id = await self._register(access_code)
return user_id
async def _register(self, access_code):
user_id = -1
if self.config.server.allow_user_registration:
user_id = await self.data.user.create_user()
if user_id is None:
self.logger.error("Failed to register user!")
user_id = -1
else:
card_id = await self.data.card.create_card(user_id, access_code)
if card_id is None:
self.logger.error("Failed to register card!")
user_id = -1
self.logger.info(
f"Register access code {access_code} -> user_id {user_id}"
)
else:
self.logger.info(f"Registration blocked!: access code {access_code}")
return user_id

View File

@@ -1,7 +1,7 @@
import logging
import os
import ssl
from typing import Any, Union
from typing import Any, Union, Dict
from typing_extensions import Optional
@@ -378,6 +378,11 @@ class AllnetConfig:
return CoreConfig.get_config_field(
self.__config, "core", "allnet", "save_billing", default=False
)
@property
def allnet_lite_keys(self) -> Dict:
return CoreConfig.get_config_field(
self.__config, "core", "allnet", "allnet_lite_keys", default={}
)
class BillingConfig:
def __init__(self, parent_config: "CoreConfig") -> None:
@@ -469,6 +474,28 @@ class AimedbConfig:
self.__config, "core", "aimedb", "id_lifetime_seconds", default=86400
)
class ChimedbConfig:
def __init__(self, parent_config: "CoreConfig") -> None:
self.__config = parent_config
@property
def enable(self) -> bool:
return CoreConfig.get_config_field(
self.__config, "core", "chimedb", "enable", default=True
)
@property
def loglevel(self) -> int:
return CoreConfig.str_to_loglevel(
CoreConfig.get_config_field(
self.__config, "core", "chimedb", "loglevel", default="info"
)
)
@property
def key(self) -> str:
return CoreConfig.get_config_field(
self.__config, "core", "chimedb", "key", default=""
)
class MuchaConfig:
def __init__(self, parent_config: "CoreConfig") -> None:
self.__config = parent_config
@@ -490,6 +517,7 @@ class CoreConfig(dict):
self.allnet = AllnetConfig(self)
self.billing = BillingConfig(self)
self.aimedb = AimedbConfig(self)
self.chimedb = ChimedbConfig(self)
self.mucha = MuchaConfig(self)
@classmethod