This commit is contained in:
55_Lar 2025-06-27 18:57:00 +08:00
commit 6a94dea065
29 changed files with 1620 additions and 0 deletions

16
DataBase/Config.json Normal file
View File

@ -0,0 +1,16 @@
{
"Keychip":"Move to DataBase/Keychip.db",
"SERVER": {
"GameHost": "maimai-gm.wahlap.com",
"GamePort": 42081,
"GameSalt": "BEs2D5vW",
"GameaesKey": "a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R",
"GameaesIV": "d6xHIKq]1J]Dt^ue",
"ObfsParam": "OBFS_PARAM",
"ChimeHost": "ai.sys-allnet.cn",
"ChimeSalt": "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW"
},
"DivingFishToken": "Db5BYpaSdAm7qHig1ZkC8zeWxrcl06VO",
"lxnsToken": "J0B7iloZ6j36zFr2yPi6ODO5DiAeTUwHUfTIRBPb_3Q=",
"version": "1.2.4"
}

BIN
DataBase/keychip.db Normal file

Binary file not shown.

BIN
Lib/7z/7-zip.dll Normal file

Binary file not shown.

BIN
Lib/7z/7-zip32.dll Normal file

Binary file not shown.

BIN
Lib/7z/7z.dll Normal file

Binary file not shown.

BIN
Lib/7z/7z.exe Normal file

Binary file not shown.

Binary file not shown.

16
Lib/API/GetApi.py Normal file
View File

@ -0,0 +1,16 @@
import sys
sys.dont_write_bytecode = True
#不生成Pycache
from Net.Net import request
import asyncio
async def getPreview(userID):
previewData = {
"userId":userID,
"segaIdAuthKey":""
}
response = await request("GetUserPreviewApi", previewData)
return response
asyncio.run(getPreview(11203000))

45
Lib/API/LogApi.py Normal file
View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import sys
sys.dont_write_bytecode = True
from Net.Net import request
KEYCHIP = "A63E01C3047"
REGION_ID = "8"
PLACE_ID = "1003"
OBFS_PARAM = "B44df8yT"
AES_KEY = b"a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R"
AES_IV = b"d6xHIKq]1J]Dt^ue"
HOST = "maimai-gm.wahlap.com"
PORT = 42081
async def logout( user_id,timeStamp):
logout_data = {
"userId": user_id,
"accessCode": "",
"regionId": REGION_ID,
"placeId": PLACE_ID,
"clientId": KEYCHIP,
"dateTime": timeStamp,
"type": 1,
}
response = await request("UserLogoutApi", logout_data)
return response
# 登录账号
async def login(user_id, dateTime):
login_data = {
"userId": user_id,
"accessCode": "",
"regionId": REGION_ID,
"placeId": PLACE_ID,
"clientId": KEYCHIP,
"dateTime": dateTime,
"isContinue": False,
"genericFlag": 0,
}
response = await request("UserLoginApi", login_data)
return response

158
Lib/API/Net/Net.py Normal file
View File

@ -0,0 +1,158 @@
import sys
sys.dont_write_bytecode = True
import asyncio
import hashlib
import json
import struct
import zlib
import httpx
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from loguru import logger
KEYCHIP = "A63E01C3047"
REGION_ID = "8"
PLACE_ID = "1003"
OBFS_PARAM = "B44df8yT"
AES_KEY = b"a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R"
AES_IV = b"d6xHIKq]1J]Dt^ue"
HOST = "maimai-gm.wahlap.com"
PORT = 42081
aimeVer = "1.0.0"
aimeInfo = ""
serverVer = "1.50.0"
serverInfo = ""
# global_PlaylogId = None
logger.remove()
logger.add(sys.stdout, level="INFO", enqueue=True)
logger.add("debug.log", level="DEBUG", enqueue=True)
def obfuscator(src_str, obfuscate_param):
combined_str = src_str + obfuscate_param
return hashlib.md5(combined_str.encode("utf-8")).hexdigest()
def decompress(data):
try:
return zlib.decompress(data)
except zlib.error:
return data
# 加密数据
def aes_encrypt(plaintext, key, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
padded_data = pad(plaintext, AES.block_size, style="pkcs7")
return cipher.encrypt(padded_data)
# 解密数据
def aes_decrypt(ciphertext, key, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size, style="pkcs7")
return decrypted_data
def hash_data(chip_id, timestamp, salt):
to_hash = f"{chip_id}{timestamp}{salt}"
hashed_data = hashlib.sha256(to_hash.encode("utf-8")).hexdigest().upper()
return hashed_data
# 发送自定义请求
async def request(req, data):
req = req + "MaimaiChn"
path = "/Maimai2Servlet/" + obfuscator(req, OBFS_PARAM)
plaintext = json.dumps(data, ensure_ascii=False).encode("utf-8")
comp = zlib.compress(plaintext)
# encrypted_data = aes_encrypt(plaintext, AES_KEY, AES_IV)
encdata = aes_encrypt(comp, AES_KEY, AES_IV)
header = {
"charset": "UTF-8",
"Mai-Encoding": "1.50",
"Content-Type": "application/json",
"Content-Encoding": "deflate",
"User-Agent": f"{obfuscator(req, OBFS_PARAM)}#",
"tlr": "-1",
"number": "0",
}
logger.debug(f"header: {header}")
async with httpx.AsyncClient(timeout=30) as client:
client.headers.clear()
try:
response = await client.request(
url=f"https://{HOST}:{PORT}{path}",
method="POST",
headers=header,
content=encdata,
)
except:
logger.error("请求出错了(#°Д°)")
return {}
encrypted_response = response.content
if response.status_code == 200:
try:
decrypted_data = aes_decrypt(encrypted_response, AES_KEY, AES_IV)
logger.debug(decrypted_data)
decompressed_data = decompress(decrypted_data)
logger.info(decompressed_data.decode("utf-8"))
return json.loads(decompressed_data.decode("utf-8"))
except Exception as e:
try:
logger.error(e)
logger.warning(response.read().decode("utf-8"))
except:
logger.warning("解码失败请检查IP")
return {}
else:
logger.error(response.read().decode("utf-8"))
return {}
async def getUserID(qrStr):
chip_id = "A63E01C2896"
timestamp = int(qrStr[8:20])
reqStr = qrStr[20:] # Assuming qrStr is in the format SGWCMAIDxxxxxx
data = (
f'{{"chipID":"{chip_id}","openGameID":"MAID",'
f'"key":"{hash_data(chip_id, timestamp, "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW")}",'
f'"qrCode":"{reqStr}","timestamp":"{timestamp}"}}'
)
headers = {
"Host": "ai.sys-allnet.cn",
"User-Agent": "WC_AIME_LIB",
"Content-Length": str(len(data)),
}
url = f"http://ai.sys-allnet.cn/wc_aime/api/get_data"
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, data=data)
response.raise_for_status()
response_json = response.json()
if response_json["userID"] == -1:
returnID = 00000000
logger.error("获取UserID失败请检查二维码")
else:
returnID = response_json["userID"]
logger.info(f"获取UserID成功: {returnID}")
return returnID
async def main():
data = await request("Ping", {})
print(data)
if __name__ == "__main__":
asyncio.run(main())

3
Lib/API/Net/__init__.py Normal file
View File

@ -0,0 +1,3 @@
import sys
sys.dont_write_bytecode = True

0
Lib/API/__init__.py Normal file
View File

55
Lib/OPT/SDGBauth.py Normal file
View File

@ -0,0 +1,55 @@
import sys
sys.dont_write_bytecode = True
import dataclasses
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import base64
import httpx
def enc(key, iv, data):
cipher = AES.new(key, AES.MODE_CBC, iv)
encrypted = cipher.encrypt(data)
return encrypted
def dec(key, iv, data):
de_cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = de_cipher.decrypt(data)
return decrypted
def getOptUrl():
key = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
iv = bytes.fromhex("00000000000000000000000000000000")
ua = "SDGB;Windows/Lite"
content = (
bytes([0] * 16)
+ b"title_id=SDGB&title_ver=1.50&client_id=A63E01C2805&token=205648745"
)
header = bytes.fromhex("00000000000000000000000000000000")
bytes_data = pad(header + content, 16)
encrypted = enc(key, iv, bytes_data)
r = httpx.post(
"http://at.sys-allnet.cn/net/delivery/instruction",
data=encrypted,
headers={"User-Agent": ua, "Pragma": "DFI"},
)
resp_data = r.content
decrypted = dec(key, resp_data[:16], resp_data)
decrypted_bytes = decrypted[16:]
decrypted_str = decrypted_bytes.decode("UTF-8")
# 新增解析逻辑:从响应中提取带/option/的URL
if "&uri=" in decrypted_str:
uri_part = decrypted_str.split("&uri=", 1)[1]
urls = uri_part.split("|")
for url in urls:
if "/option/" in url:
return url # 返回第一个匹配的option URL
return "" # 未找到时返回空字符串
if __name__ == "__main__":
print(getOptUrl())

3
Lib/OPT/__init__.py Normal file
View File

@ -0,0 +1,3 @@
import sys
sys.dont_write_bytecode = True

0
Lib/__init__.py Normal file
View File

266
Lib/decryptOpt.py Normal file
View File

@ -0,0 +1,266 @@
import sys
sys.dont_write_bytecode = True
import asyncio
import os
import re
import shutil
import zipfile
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
import glob
class OptProcessor:
def __init__(self):
self.base_dir = os.path.dirname(os.path.abspath(__file__))
self.unsega_path = os.path.join(self.base_dir, "unsega.exe")
self.seven_zip_dir = os.path.join(self.base_dir, "7z")
self.seven_zip_path = os.path.join(self.seven_zip_dir, "7z.exe")
self.exfat_dll_path = os.path.join(self.seven_zip_dir, "Formats", "ExFat7z.64.dll")
async def process_with_unsega(self, opt_path: str) -> str:
"""异步处理OPT文件生成VHD文件"""
if not os.path.exists(self.unsega_path):
raise FileNotFoundError(f"未找到 unsega.exe: {self.unsega_path}")
# 使用绝对路径并确保格式正确
opt_path = os.path.abspath(opt_path)
opt_dir = os.path.dirname(opt_path)
vhd_filename = os.path.splitext(os.path.basename(opt_path))[0] + ".vhd"
vhd_path = os.path.abspath(os.path.join(opt_dir, vhd_filename))
# 删除已存在的VHD文件
if os.path.exists(vhd_path):
os.remove(vhd_path)
# 异步执行unsega命令
proc = await asyncio.create_subprocess_exec(
self.unsega_path,
opt_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
# 检查VHD文件是否生成
max_wait = 10
for _ in range(max_wait):
if os.path.exists(vhd_path):
return vhd_path
await asyncio.sleep(1)
raise TimeoutError(f"未检测到VHD文件生成: {vhd_path}")
async def extract_vhd_with_7zip(self, vhd_path: str) -> str:
"""异步解压VHD文件"""
# 确保路径存在
vhd_path = os.path.abspath(vhd_path)
if not os.path.exists(vhd_path):
raise FileNotFoundError(f"VHD文件不存在: {vhd_path}")
# 检查7-Zip路径
if not os.path.exists(self.seven_zip_path):
raise FileNotFoundError(f"未找到7-Zip程序: {self.seven_zip_path}")
# 准备解压目录
extract_dir = os.path.abspath(os.path.splitext(vhd_path)[0])
if not os.path.exists(extract_dir):
os.makedirs(extract_dir)
# 构建7-Zip命令 - 使用绝对路径
command = [
self.seven_zip_path,
"x",
vhd_path,
f"-o{extract_dir}",
"-y"
]
# 异步执行7z命令
proc = await asyncio.create_subprocess_exec(
*command,
cwd=self.seven_zip_dir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 获取输出
stdout, stderr = await proc.communicate()
return_code = await proc.wait()
# 检查解压结果
if return_code != 0:
# 尝试解码错误信息
try:
error_msg = stderr.decode('gbk', errors='ignore')
except:
error_msg = stderr.decode('utf-8', errors='ignore')
raise RuntimeError(f"7-Zip解压失败: {error_msg}")
# 检查解压目录内容
if not os.path.exists(extract_dir) or not os.listdir(extract_dir):
raise RuntimeError(f"解压目录为空: {extract_dir}")
return extract_dir
def _rename_extracted_dir(self, extract_dir: str) -> str:
"""重命名解压后的文件夹"""
folder_name = os.path.basename(extract_dir)
pattern = r'^SDGB_(A\d{3})_\d+_\d$'
match = re.match(pattern, folder_name)
if match:
new_name = match.group(1)
new_path = os.path.join(os.path.dirname(extract_dir), new_name)
if os.path.exists(new_path):
shutil.rmtree(new_path)
os.rename(extract_dir, new_path)
return new_path
return extract_dir # 无法提取版本号时返回原路径
def _compress_folder_to_zip(self, folder_path: str) -> str:
"""将文件夹压缩为zip文件"""
parent_dir = os.path.dirname(folder_path)
zip_path = os.path.join(parent_dir, os.path.basename(folder_path) + ".zip")
# 删除已存在的zip文件
if os.path.exists(zip_path):
os.remove(zip_path)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, folder_path)
zipf.write(file_path, arcname)
return zip_path
async def unzipOpt(self, input_path: str) -> str:
"""
处理OPT文件或目录
:param input_path: 可以是单个.opt文件路径也可以是包含.opt文件的目录路径
:return: 处理结果单个文件或处理报告目录
"""
# 标准化路径
input_path = os.path.abspath(input_path)
# 如果是目录,则处理整个目录
if os.path.isdir(input_path):
success_list, fail_list = await self.process_directory(input_path)
result = f"处理完成!\n成功处理 {len(success_list)} 个文件:"
for file in success_list:
result += f"\n - {file}"
if fail_list:
result += f"\n\n处理失败 {len(fail_list)} 个文件:"
for error in fail_list:
result += f"\n - {error}"
return result
# 如果是文件,则处理单个文件
if not input_path.lower().endswith('.opt'):
raise ValueError(f"文件扩展名必须是.opt: {input_path}")
if not os.path.exists(input_path):
raise FileNotFoundError(f"文件不存在: {input_path}")
try:
# 步骤1: 转换OPT为VHD
vhd_path = await self.process_with_unsega(input_path)
# 步骤2: 解压VHD文件
extract_dir = await self.extract_vhd_with_7zip(vhd_path)
# 步骤3: 重命名最终目录
final_dir = self._rename_extracted_dir(extract_dir)
# 步骤4: 压缩最终目录为zip文件
zip_path = await asyncio.to_thread(self._compress_folder_to_zip, final_dir)
# 步骤5: 清理中间文件
# 删除VHD文件
os.remove(vhd_path)
# 删除解压后的文件夹
shutil.rmtree(final_dir, ignore_errors=True)
# 返回成功信息
return f"处理成功! 生成的ZIP文件: {zip_path}"
except Exception as e:
# 清理部分生成的文件
if 'vhd_path' in locals() and os.path.exists(vhd_path):
os.remove(vhd_path)
if 'final_dir' in locals() and os.path.exists(final_dir):
shutil.rmtree(final_dir, ignore_errors=True)
if 'zip_path' in locals() and os.path.exists(zip_path):
os.remove(zip_path)
raise RuntimeError(f"处理失败: {str(e)}") from e
async def process_directory(self, directory_path: str) -> tuple:
"""
处理目录中的所有OPT文件
:param directory_path: 包含OPT文件的目录路径
:return: 成功处理的ZIP文件列表和失败文件列表
"""
# 获取目录中所有.opt文件
opt_files = glob.glob(os.path.join(directory_path, "*.opt"))
if not opt_files:
return [], [f"目录中没有找到OPT文件: {directory_path}"]
# 处理所有OPT文件
success_list = []
fail_list = []
for opt_file in opt_files:
# 确保是文件而不是目录
if not os.path.isfile(opt_file):
fail_list.append(f"{opt_file}: 不是文件")
continue
try:
# 处理单个文件
vhd_path = await self.process_with_unsega(opt_file)
extract_dir = await self.extract_vhd_with_7zip(vhd_path)
final_dir = self._rename_extracted_dir(extract_dir)
zip_path = await asyncio.to_thread(self._compress_folder_to_zip, final_dir)
# 清理中间文件
os.remove(vhd_path)
shutil.rmtree(final_dir, ignore_errors=True)
success_list.append(zip_path)
except Exception as e:
fail_list.append(f"{opt_file}: {str(e)}")
return success_list, fail_list
# 使用示例
async def main():
processor = OptProcessor()
# 处理单个文件
try:
result = await processor.unzipOpt("/path/to/your/file.opt")
print(result)
except Exception as e:
print(str(e))
# 处理整个目录
try:
result = await processor.unzipOpt("/path/to/opt/directory")
print(result)
except Exception as e:
print(str(e))
if __name__ == "__main__":
asyncio.run(main())

23
Lib/downLib/__init__.py Normal file
View File

@ -0,0 +1,23 @@
import sys
sys.dont_write_bytecode = True
from .downloader import Downloader, download, downloadMany
from .exceptions import (
DownloadError, MaxRetriesExceeded,
InvalidURL, FileSizeMismatch, UnsupportedProtocol,
SkipDownload
)
__all__ = [
"Downloader",
"download",
"downloadMany",
"DownloadError",
"MaxRetriesExceeded",
"InvalidURL",
"FileSizeMismatch",
"UnsupportedProtocol",
"SkipDownload"
]
__version__ = "1.2.0"

630
Lib/downLib/downloader.py Normal file
View File

@ -0,0 +1,630 @@
import sys
sys.dont_write_bytecode = True
import os
import asyncio
import time
import httpx
import aiofiles
import warnings
import tempfile
import shutil
from pathlib import Path
from tqdm import tqdm
from typing import Optional, List, Dict, Union, Callable, Awaitable, Tuple
from .exceptions import (
DownloadError, MaxRetriesExceeded,
InvalidURL, FileSizeMismatch, UnsupportedProtocol
)
from .utils import (
validate_url, get_filename_from_url,
sanitize_filename, format_size,
calculate_download_speed, get_file_extension,
clean_url, ask_overwrite, generate_unique_filename,
calculate_chunks
)
class Downloader:
def __init__(
self,
max_workers: int = 5,
max_retries: int = 3,
chunk_size: int = 8192,
timeout: int = 30,
user_agent: str = "downLib/1.0",
progress: bool = True,
resume: bool = True,
overwrite: bool = False,
ask_overwrite: bool = False,
auto_rename: bool = False,
multithread: bool = True,
threads: int = 4,
min_size_for_multithread: int = 5 * 1024 * 1024,
proxy: Optional[str] = None,
headers: Optional[Dict] = None,
callback: Optional[Callable[[Dict], Awaitable[None]]] = None,
http2: bool = False,
ask_retry: bool = True, # 是否在下载失败时询问用户是否重试
retry_timeout: int = 30 # 重试询问超时时间(秒)
):
"""
初始化下载器
:param max_workers: 最大并发工作数
:param max_retries: 最大重试次数
:param chunk_size: 下载块大小
:param timeout: 超时时间()
:param user_agent: 用户代理字符串
:param progress: 是否显示进度条
:param resume: 是否支持断点续传
:param overwrite: 是否自动覆盖已存在文件
:param ask_overwrite: 是否询问是否覆盖已存在文件
:param auto_rename: 是否自动重命名文件避免覆盖
:param multithread: 是否启用多线程下载
:param threads: 多线程下载的线程数
:param min_size_for_multithread: 启用多线程的最小文件大小 (字节)
:param proxy: 代理设置
:param headers: 自定义请求头
:param callback: 下载完成后的回调函数
:param http2: 是否启用HTTP/2协议
:param ask_retry: 是否在下载失败时询问用户是否重试
:param retry_timeout: 重试询问超时时间
"""
self.max_workers = max_workers
self.max_retries = max_retries
self.chunk_size = chunk_size
self.timeout = timeout
self.user_agent = user_agent
self.progress = progress
self.resume = resume
self.overwrite = overwrite
self.ask_overwrite = ask_overwrite
self.auto_rename = auto_rename
self.multithread = multithread
self.threads = max(1, min(threads, 16)) # 限制线程数在1-16之间
self.min_size_for_multithread = min_size_for_multithread
self.proxy = proxy
self.headers = headers or {}
self.callback = callback
self.http2 = http2
self.ask_retry = ask_retry
self.retry_timeout = retry_timeout
# 设置默认User-Agent
if "User-Agent" not in self.headers:
self.headers["User-Agent"] = self.user_agent
# 检查HTTP/2支持
if self.http2:
try:
import h2
except ImportError:
warnings.warn(
"HTTP/2 requested but 'h2' package not installed. "
"Falling back to HTTP/1.1. Install with: pip install httpx[http2]",
RuntimeWarning
)
self.http2 = False
async def download(
self,
url: str,
save_path: Optional[Union[str, Path]] = None,
custom_filename: Optional[str] = None,
position: Optional[int] = None, # 进度条显示位置
**kwargs
) -> Path:
"""
下载单个文件
:param url: 文件URL
:param save_path: 保存路径(目录或完整路径)
:param custom_filename: 自定义文件名
:param position: 进度条显示位置
:return: 下载文件的完整路径
"""
# 清理URL中的非法字符
original_url = url
url = clean_url(url)
if url != original_url and self.progress:
print(f"警告: URL 已清理\n原始: {original_url}\n清理后: {url}")
# 验证URL
if not validate_url(url):
raise InvalidURL(url)
# 确定保存路径和文件名
if save_path is None:
save_path = Path.cwd()
else:
save_path = Path(save_path)
if save_path.is_dir():
filename = custom_filename or get_filename_from_url(url)
filename = sanitize_filename(filename)
file_path = save_path / filename
else:
file_path = save_path
# 创建保存目录
file_path.parent.mkdir(parents=True, exist_ok=True)
# 检查文件是否已存在
file_exists = file_path.exists()
initial_size = file_path.stat().st_size if file_exists else 0
# 处理文件覆盖逻辑
if file_exists:
# 自动覆盖模式
if self.overwrite:
initial_size = 0
file_path.unlink()
file_exists = False
if self.progress:
print(f"已覆盖文件: {file_path}")
# 自动重命名模式
elif self.auto_rename:
new_file_path = generate_unique_filename(file_path)
if self.progress:
print(f"文件已存在,自动重命名为: {new_file_path.name}")
file_path = new_file_path
file_exists = False
initial_size = 0
# 询问是否覆盖模式
elif self.ask_overwrite:
if ask_overwrite(file_path, initial_size):
initial_size = 0
file_path.unlink()
file_exists = False
if self.progress:
print(f"覆盖文件: {file_path}")
else:
# 用户选择不覆盖,跳过下载
if self.progress:
print(f"跳过下载: {file_path}")
if self.callback:
result = {
"url": url,
"original_url": original_url,
"file_path": file_path,
"file_size": initial_size,
"download_time": 0,
"success": True,
"skipped": True,
"message": "文件已存在,跳过下载"
}
await self.callback(result)
return file_path
# 既不覆盖也不询问,但文件存在 - 跳过下载
else:
if self.progress:
print(f"文件已存在,跳过下载: {file_path}")
if self.callback:
result = {
"url": url,
"original_url": original_url,
"file_path": file_path,
"file_size": initial_size,
"download_time": 0,
"success": True,
"skipped": True,
"message": "文件已存在,跳过下载"
}
await self.callback(result)
return file_path
# 下载参数
retry_count = 0
last_error = None
start_time = time.time()
# 生成文件唯一标识符,用于进度条位置
file_id = hash(url) % 1000 # 取模确保值不会太大
# 如果未指定位置使用文件ID
if position is None:
position = file_id % 5 # 限制最多5个位置
# 创建进度条(提前创建以便显示错误信息)
progress_bar = None
if self.progress:
desc = file_path.name[:20] + (file_path.name[20:] and "...")
progress_bar = tqdm(
total=0,
initial=0,
unit="B",
unit_scale=True,
desc=desc,
ncols=80,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
position=position, # 使用指定的位置
leave=False # 下载完成后不保留进度条
)
while True:
try:
async with httpx.AsyncClient(
http2=self.http2,
timeout=self.timeout,
proxy=self.proxy,
headers=self.headers,
follow_redirects=True
) as client:
# 设置Range头实现断点续传
headers = {}
if self.resume and file_exists and initial_size > 0:
headers["Range"] = f"bytes={initial_size}-"
# 发送HEAD请求获取文件信息
head_resp = await client.head(url, headers=headers)
head_resp.raise_for_status()
# 检查服务器是否支持断点续传
accept_ranges = head_resp.headers.get("Accept-Ranges", "none") == "bytes"
content_length = int(head_resp.headers.get("Content-Length", 0))
# 如果服务器不支持断点续传,但本地有部分文件,则重新下载
if not accept_ranges and initial_size > 0:
initial_size = 0
file_exists = False
# 计算总文件大小
total_size = initial_size + content_length
# 检查是否启用多线程下载
use_multithread = (
self.multithread and
accept_ranges and
total_size >= self.min_size_for_multithread
)
# 更新进度条总大小
if progress_bar:
progress_bar.reset(total=total_size)
progress_bar.update(initial_size)
# 更新描述信息
desc = file_path.name[:20] + (file_path.name[20:] and "...")
if use_multithread:
desc += f" [多线程:{self.threads}]"
progress_bar.set_description(desc)
# 只在开始时显示一次多线程启用消息
if use_multithread:
progress_bar.write(f"启用多线程下载: {file_path.name} (使用 {self.threads} 个线程)")
# 选择下载方法
if use_multithread:
await self._multithread_download(
client, url, file_path, total_size, progress_bar
)
else:
await self._singlethread_download(
client, url, file_path, headers, progress_bar, initial_size
)
# 关闭进度条
if progress_bar:
progress_bar.close()
# 验证文件大小
actual_size = file_path.stat().st_size
if total_size > 0 and actual_size != total_size:
raise FileSizeMismatch(total_size, actual_size)
# 计算下载时间
download_time = time.time() - start_time
# 回调函数
if self.callback:
result = {
"url": url,
"original_url": original_url,
"file_path": file_path,
"file_size": actual_size,
"download_time": download_time,
"success": True,
"retries": retry_count,
"skipped": False,
"multithread": use_multithread,
"threads": self.threads if use_multithread else 1
}
await self.callback(result)
return file_path
except (httpx.RequestError, httpx.HTTPStatusError, OSError) as e:
last_error = e
retry_count += 1
# 检查是否达到最大重试次数
if retry_count >= self.max_retries:
# 回调函数通知失败
if self.callback:
result = {
"url": url,
"original_url": original_url,
"file_path": file_path,
"error": str(last_error),
"success": False,
"retries": retry_count,
"skipped": False
}
await self.callback(result)
# 询问用户是否继续重试
if self.ask_retry and self.progress:
try:
# 显示错误信息
if progress_bar:
progress_bar.clear()
progress_bar.write(f"下载失败: {str(e)}")
progress_bar.write(f"已达到最大重试次数 ({self.max_retries}次)")
# 创建异步输入任务
input_task = asyncio.create_task(
self._async_input(
f"是否继续重试下载? (y/n) [{self.retry_timeout}秒后自动继续]: ",
position
)
)
# 等待用户输入或超时
try:
response = await asyncio.wait_for(
input_task,
timeout=self.retry_timeout
)
except asyncio.TimeoutError:
response = None
if response and response.strip().lower() in ['y', 'yes']:
# 用户选择继续重试
if progress_bar:
progress_bar.write("用户选择继续重试下载")
retry_count = 0 # 重置重试计数
continue
elif response and response.strip().lower() in ['n', 'no']:
# 用户选择放弃
if progress_bar:
progress_bar.write("用户选择放弃下载")
break
else:
# 超时或无效输入,自动继续重试
if progress_bar:
progress_bar.write(f"{self.retry_timeout}秒未响应,自动继续重试")
retry_count = 0 # 重置重试计数
continue
except Exception as e:
# 输入处理出错,自动继续重试
if progress_bar:
progress_bar.write(f"输入处理出错: {str(e)},自动继续重试")
retry_count = 0 # 重置重试计数
continue
else:
# 不询问用户,直接放弃
break
else:
# 未达到最大重试次数,等待指数退避
wait_time = 2 ** retry_count
if progress_bar:
progress_bar.write(f"下载出错: {str(e)},将在 {wait_time} 秒后重试 (尝试 {retry_count}/{self.max_retries})")
await asyncio.sleep(wait_time)
# 所有重试都失败或用户放弃
if progress_bar:
progress_bar.close()
# 返回文件路径表示失败
return file_path
async def _async_input(self, prompt: str, position: int) -> str:
"""异步获取用户输入"""
# 在指定位置显示提示
with tqdm.external_write_mode(position=position):
print(prompt, end='', flush=True)
# 创建事件循环
loop = asyncio.get_running_loop()
# 在单独的线程中运行输入函数
return await loop.run_in_executor(None, input)
async def _singlethread_download(
self,
client: httpx.AsyncClient,
url: str,
file_path: Path,
headers: Dict,
progress_bar: Optional[tqdm],
initial_size: int
):
"""单线程下载实现"""
async with client.stream(
"GET", url, headers=headers
) as response:
response.raise_for_status()
# 打开文件进行写入
mode = "ab" if initial_size > 0 else "wb"
async with aiofiles.open(file_path, mode) as f:
downloaded = initial_size
async for chunk in response.aiter_bytes(chunk_size=self.chunk_size):
await f.write(chunk)
downloaded += len(chunk)
# 更新进度条
if progress_bar:
progress_bar.update(len(chunk))
async def _multithread_download(
self,
client: httpx.AsyncClient,
url: str,
file_path: Path,
total_size: int,
progress_bar: Optional[tqdm]
):
"""多线程下载实现"""
# 创建临时目录
temp_dir = Path(tempfile.mkdtemp(prefix="downlib_"))
try:
# 计算分块
chunk_size = max(self.chunk_size * 100, total_size // self.threads)
chunks = calculate_chunks(total_size, chunk_size)
# 准备分块下载任务
tasks = []
for i, (start, end) in enumerate(chunks):
temp_file = temp_dir / f"chunk_{i}.part"
tasks.append(
self._download_chunk(client, url, temp_file, start, end, progress_bar)
)
# 并发下载所有分块
await asyncio.gather(*tasks)
# 合并分块
async with aiofiles.open(file_path, "wb") as outfile:
for i in range(len(chunks)):
temp_file = temp_dir / f"chunk_{i}.part"
async with aiofiles.open(temp_file, "rb") as infile:
while True:
chunk = await infile.read(self.chunk_size)
if not chunk:
break
await outfile.write(chunk)
finally:
# 清理临时文件
shutil.rmtree(temp_dir, ignore_errors=True)
async def _download_chunk(
self,
client: httpx.AsyncClient,
url: str,
temp_file: Path,
start: int,
end: int,
progress_bar: Optional[tqdm]
):
"""下载单个分块"""
headers = {"Range": f"bytes={start}-{end}"}
async with client.stream(
"GET", url, headers=headers
) as response:
response.raise_for_status()
# 打开临时文件进行写入
async with aiofiles.open(temp_file, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=self.chunk_size):
await f.write(chunk)
# 更新进度条
if progress_bar:
progress_bar.update(len(chunk))
async def downloadMany(
self,
urls: List[str],
save_dir: Optional[Union[str, Path]] = None,
custom_filenames: Optional[List[str]] = None
) -> List[Path]:
"""
并发下载多个文件
:param urls: URL列表
:param save_dir: 保存目录
:param custom_filenames: 自定义文件名列表
:return: 下载文件的完整路径列表
"""
if save_dir is None:
save_dir = Path.cwd()
else:
save_dir = Path(save_dir)
# 准备任务
tasks = []
# 为每个任务分配固定的显示位置
positions = [i % min(len(urls), 5) for i in range(len(urls))]
for i, url in enumerate(urls):
filename = custom_filenames[i] if custom_filenames and i < len(custom_filenames) else None
tasks.append(self.download(
url,
save_dir,
filename,
position=positions[i] # 指定显示位置
))
# 使用信号量控制并发
semaphore = asyncio.Semaphore(self.max_workers)
async def limited_task(task):
async with semaphore:
return await task
# 执行并发下载
results = await asyncio.gather(
*(limited_task(task) for task in tasks),
return_exceptions=False
)
# 处理结果
downloaded_files = []
for result in results:
downloaded_files.append(result)
# 所有下载完成后打印总结信息
if self.progress:
print("\n下载总结:")
for path in downloaded_files:
if path.exists():
size = path.stat().st_size
print(f"{path.name} ({format_size(size)})")
else:
print(f"{path.name} (下载失败)")
return downloaded_files
async def downloadWithProgress(
self,
url: str,
save_path: Optional[Union[str, Path]] = None,
custom_filename: Optional[str] = None
) -> Path:
"""下载文件并显示进度条(包装方法)"""
# 临时覆盖实例设置以确保显示进度
original_progress = self.progress
self.progress = True
try:
return await self.download(url, save_path, custom_filename)
finally:
self.progress = original_progress
# 快捷函数
async def download(
url: str,
save_path: Optional[Union[str, Path]] = None,
custom_filename: Optional[str] = None,
**kwargs
) -> Path:
"""快捷下载函数"""
downloader = Downloader(**kwargs)
return await downloader.download(url, save_path, custom_filename)
async def downloadMany(
urls: List[str],
save_dir: Optional[Union[str, Path]] = None,
custom_filenames: Optional[List[str]] = None,
**kwargs
) -> List[Path]:
"""并发下载多个文件"""
downloader = Downloader(**kwargs)
return await downloader.downloadMany(urls, save_dir, custom_filenames)

33
Lib/downLib/exceptions.py Normal file
View File

@ -0,0 +1,33 @@
# exceptions.py (添加跳过下载异常)
import sys
sys.dont_write_bytecode = True
class DownloadError(Exception):
"""基础下载异常类"""
pass
class MaxRetriesExceeded(DownloadError):
"""超过最大重试次数异常"""
def __init__(self, url, max_retries):
super().__init__(f"URL '{url}' 下载失败,超过最大重试次数 {max_retries}")
class InvalidURL(DownloadError):
"""无效URL异常"""
def __init__(self, url):
super().__init__(f"无效的URL: '{url}'")
class FileSizeMismatch(DownloadError):
"""文件大小不匹配异常"""
def __init__(self, expected, actual):
super().__init__(f"文件大小不匹配: 期望 {expected} 字节, 实际 {actual} 字节")
class UnsupportedProtocol(DownloadError):
"""不支持的协议异常"""
def __init__(self, protocol):
super().__init__(f"不支持的协议: '{protocol}'")
class SkipDownload(DownloadError):
"""用户跳过下载异常"""
def __init__(self, file_path):
super().__init__(f"用户跳过下载: {file_path}")

126
Lib/downLib/utils.py Normal file
View File

@ -0,0 +1,126 @@
# utils.py (添加唯一文件名生成功能)
import sys
sys.dont_write_bytecode = True
import os
import re
import math
from pathlib import Path
from urllib.parse import urlparse, unquote, quote
def validate_url(url: str) -> bool:
"""验证URL是否有效"""
try:
# 先解码再编码以规范化URL
decoded_url = unquote(url)
encoded_url = quote(decoded_url, safe='/:?=&')
result = urlparse(encoded_url)
return all([result.scheme, result.netloc])
except:
return False
def clean_url(url: str) -> str:
"""
清理URL中的非法字符
1. 移除不可打印的ASCII字符
2. 规范化URL编码
3. 处理空格和特殊字符
"""
try:
# 先解码URL
decoded_url = unquote(url)
# 移除控制字符 (ASCII 0-31 和 127)
cleaned_url = re.sub(r'[\x00-\x1F\x7F]', '', decoded_url)
# 对非安全字符进行编码
return quote(cleaned_url, safe='/:?=&')
except:
return url # 如果清理失败返回原始URL
def get_filename_from_url(url: str) -> str:
"""从URL中提取文件名"""
path = urlparse(url).path
return os.path.basename(path) or "downloaded_file"
def sanitize_filename(filename: str) -> str:
"""清理文件名中的非法字符"""
return re.sub(r'[\\/*?:"<>|]', "_", filename)
def get_file_extension(filename: str) -> str:
"""获取文件扩展名"""
return os.path.splitext(filename)[1].lower()
def format_size(size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024**2:
return f"{size_bytes / 1024:.2f} KB"
elif size_bytes < 1024**3:
return f"{size_bytes / (1024**2):.2f} MB"
else:
return f"{size_bytes / (1024**3):.2f} GB"
def calculate_download_speed(start_time, end_time, downloaded_bytes):
"""计算下载速度"""
duration = max(end_time - start_time, 0.001) # 避免除以零
speed = downloaded_bytes / duration # 字节/秒
return f"{format_size(speed)}/s"
def ask_overwrite(file_path: Path, file_size: int) -> bool:
"""
询问用户是否覆盖已存在的文件
:param file_path: 文件路径
:param file_size: 文件大小
:return: True表示覆盖False表示跳过
"""
print(f"\n文件已存在: {file_path}")
print(f"文件大小: {format_size(file_size)}")
while True:
response = input("是否覆盖? (y/n): ").strip().lower()
if response in ['y', 'yes']:
return True
elif response in ['n', 'no']:
return False
else:
print("无效输入,请输入 'y''n'")
def generate_unique_filename(file_path: Path) -> Path:
"""
生成唯一的文件名避免覆盖
:param file_path: 原始文件路径
:return: 唯一的文件路径
"""
base = file_path.stem
extension = file_path.suffix
counter = 1
# 检查文件名是否已存在
while file_path.exists():
# 在文件名后添加计数器 (例如 file(1).txt)
file_path = file_path.parent / f"{base}({counter}){extension}"
counter += 1
return file_path
def calculate_chunks(total_size: int, chunk_size: int) -> list[tuple[int, int]]:
"""
计算分块范围
:param total_size: 文件总大小
:param chunk_size: 每块大小
:return: 分块范围列表 [(start, end)]
"""
chunks = []
num_chunks = math.ceil(total_size / chunk_size)
for i in range(num_chunks):
start = i * chunk_size
end = min((i + 1) * chunk_size - 1, total_size - 1)
chunks.append((start, end))
return chunks

BIN
Lib/unsega.exe Normal file

Binary file not shown.

51
README.MD Normal file
View File

@ -0,0 +1,51 @@
# maiTec 项目
## 项目简介
maiTec 是一个基于 Python 开发的舞萌科技项目
## 主要功能
- opt下载和解包
- 超好用的下载器(?
- 更新中
## 环境依赖
请确保已安装以下依赖包:
```
pip install -r requirements.txt
```
## 项目结构
```
maiTec/
├── main.py # 主函数入口
├── requirements.txt # 依赖包列表
└── README.md # 项目说明文档
```
## 使用方法
1. 克隆本仓库到本地:
```
git clone <仓库地址>
```
2. 安装依赖:
```
pip install -r requirements.txt
```
3. 运行主程序:
```
python src/main.py
```
## 贡献方式
欢迎提交 issue 或 pull request 参与项目改进!
## 许可证
本项目采用 MIT 许可证。

1
RunMeFirst.bat Normal file
View File

@ -0,0 +1 @@
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

51
main.py Normal file
View File

@ -0,0 +1,51 @@
import sys
sys.dont_write_bytecode = True
from Lib.OPT.SDGBauth import getOptUrl
from Lib.downLib import download, downloadMany
from Lib.decryptOpt import OptProcessor
import configparser
import asyncio
import os
import subprocess
import glob
import time
class OptionGet:
async def downloadOrder(self):
url = getOptUrl()
print("获取的option地址:", url)
await download(url, save_path="./order/order.ini", custom_filename="order.ini")
async def downLoadOpt(self, default="old"):
optUrlDict = {}
config = configparser.ConfigParser()
config.read("./order/order.ini", encoding="utf-8")
for key, value in config["OPTIONAL"].items():
name = key
url = value
optUrlDict.update({name: url})
urlList = list(optUrlDict.values())
newUrl = config["COMMON"]["INSTALL1"]
if default == "old":
await downloadMany(urlList, save_dir="./opt")
elif default == "new":
await download(newUrl, save_path="./opt")
else:
urlList.append(newUrl)
print("下载列表:", urlList)
await downloadMany(urlList, save_dir="./opt")
async def getOptImg():
processor = OptProcessor()
optGet = OptionGet()
await optGet.downloadOrder()
await optGet.downLoadOpt("all")
await processor.unzipOpt("./opt")
if __name__ == "__main__":
asyncio.run(getOptImg())

Binary file not shown.

Binary file not shown.

57
order/order.ini Normal file
View File

@ -0,0 +1,57 @@
;-----------------------------------------
; オプションイメージ配信指示書ソース
; 本番環境用
;-----------------------------------------
[COMMON]
;-----------------------------------------
; 配信開始日と公開日
;-----------------------------------------
ORDER_TIME=2025-06-21T07:00:00
RELEASE_TIME=2025-06-23T07:00:00
; 指示書設置時点で過去の日時になっていればよいが、
; 筺体が意図した配信指示書を受け取っているかの確認を
; テストモードの「ALL.Netダウンロード状況」で
; ここで設定した日時になっているかで行うために
; 日付をチェック当日に変更する。
; 時刻は00:00にする。識別するために開発用は01:01
;-----------------------------------------
; ディスクリプション
;-----------------------------------------
GAME_DESC="OPTION_CN150_A007"
; 筐体ごとの配信状況のレポートの「指示書説明」欄に表示される文字列。
; 配信指示書を識別できれば何でもよい。基本は以下のフォーマットとする。
; <バージョン>_AXXX_YYMMDD
;-----------------------------------------
; 受信サイズ(配信速度)
;-----------------------------------------
; +-- 1回(1秒)あたりの受信サイズ
; | 本番で設定可能な値は以下
; | 8192 = 8KB/sec = 64Kbps(通常配信)
; | 16384 = 16KB/sec = 128Kbps(2倍速配信)
; | 32768 = 32KB/sec = 256kbps(4倍速配信)
; v
PART_SIZE=2048,8192,8192
;-----------------------------------------
;ここから下は基本的に変更する必要は無い
;-----------------------------------------
DLFORMAT=5.00
GAME_ID=SDGB
INTERVAL=5000,10000,15000,20000
DSL_INTERVAL=2500,5000,7500,10000
BB_INTERVAL=1000,2000,3000,4000
CLOUD=000000000000000000000000000000000000000000000000
REPORT=http://at.sys-allnet.cn/report-api/Report
REPORT_INTERVAL=3600
;-----------------------------------------
RELEASE_TYPE=1
IMMEDIATELY_RELEASE=0
INSTALL1=https://maimai-haisin.wahlap.com/download/chn/150/option/SDGB_A007_20250619173010_0.opt
[OPTIONAL]
INSTALL1=https://maimai-haisin.wahlap.com/download/chn/150/option/SDGB_A005_20250603074349_0.opt
INSTALL2=https://maimai-haisin.wahlap.com/download/chn/150/option/SDGB_A006_20250612160242_0.opt

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
aiofiles==24.1.0
h2==4.2.0
httpx==0.28.1
loguru==0.7.3
pycryptodome==3.23.0
tqdm==4.67.1

80
zip.PY Normal file
View File

@ -0,0 +1,80 @@
import json
import os
import subprocess
import datetime
import sys
def main():
# 检查7z工具是否存在
seven_zip_path = os.path.join("Lib", "7z", "7z.exe")
if not os.path.exists(seven_zip_path):
print(f"错误: 7z工具未在 {seven_zip_path} 找到")
sys.exit(1)
# 读取版本号(处理编码问题)
config_path = "./DataBase/Config.json"
if not os.path.exists(config_path):
print(f"错误: 配置文件 {config_path} 不存在")
sys.exit(1)
try:
# 先以二进制模式读取,再检测编码
with open(config_path, 'rb') as f:
raw_data = f.read()
# 尝试检测常见编码
for encoding in ['utf-8', 'gbk', 'latin-1']:
try:
config = json.loads(raw_data.decode(encoding))
version = config.get("version")
if version:
break
except (UnicodeDecodeError, json.JSONDecodeError):
continue
else:
raise ValueError("无法确定文件编码")
if not version:
print("错误: 配置文件中未找到 version 字段")
sys.exit(1)
except Exception as e:
print(f"读取配置文件失败: {str(e)}")
sys.exit(1)
# 生成时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
zip_name = f"MaiTec_Ver{version}_{timestamp}.zip"
# 构建7z命令使用正斜杠确保跨平台兼容
cmd = [
seven_zip_path,
"a", # 添加文件
"-tzip", # 压缩为ZIP格式
zip_name,
".", # 当前目录
]
# 执行压缩命令
try:
print(f"开始压缩: {zip_name}...")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding='utf-8',
errors='replace',
check=True
)
print("压缩成功完成!")
print(f"输出信息: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"压缩失败: {e.stderr}")
sys.exit(1)
except Exception as e:
print(f"执行过程中出错: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()