This commit is contained in:
Bennett 2025-06-20 22:29:24 +08:00
parent bd69ee2b9c
commit 2603f5e04a
10 changed files with 435 additions and 30 deletions

View File

@ -1,7 +1,6 @@
import subprocess
import sys
from config import mcc
class MaichartConverterFailed(BaseException):
def __str__(self):
return "Maichart conversion failed."
@ -9,7 +8,7 @@ class MaichartConverterFailed(BaseException):
def ma2tosimai(ma2path):
cmd = [
"mcc/MaichartConverter","CompileMa2","-p",
mcc,"CompileMa2","-p",
ma2path,
"-f","SimaiFes"
]

View File

@ -1,5 +1,4 @@
import UnityPy
from PIL import Image
def convert_ab_to_png(assetbundle_path, output_png_path):

View File

@ -1,4 +1,3 @@
import os
import sys
from pathlib import Path
from subprocess import run

4
config.py Normal file
View File

@ -0,0 +1,4 @@
streaming_assets = "/Users/bennett/Downloads/SDEZ/Package/Sinmai_Data/StreamingAssets"
output_folder = "result"
mcc = "MaichartConverter"
work = "/Users/bennett/PJCK/CCC/work"

View File

@ -78,7 +78,13 @@ def convert_to_simai_folder(result,output_folder):
for path in ma2_list:
filename = os.path.basename(path)
try:
num = int(filename[-6:-4]) # 提取 _00 → 0
fn = filename[-6:-4]
if fn is "_L":
num = 0
elif fn is "_R":
num = 1
else:
num = int(fn) # 提取 _00 → 0
level = num + 2 # 转换为 Simai 难度等级
convert_results[level] = ma2tosimai(path)
except Exception as e:
@ -136,8 +142,8 @@ def convert_to_simai_folder(result,output_folder):
# 示例调用
if __name__ == "__main__":
music_ids = [834,799]
output_folder = "result"
music_ids = [643]
from config import output_folder
max_workers = 6 # 根据 CPU 和硬盘负载合理设置线程数
with ThreadPoolExecutor(max_workers=max_workers) as executor:

65
findsong.py Normal file
View File

@ -0,0 +1,65 @@
from difflib import SequenceMatcher
import json
sim_num = 0.5
def deduplicate_by_songid(matches):
unique = {}
for item in matches:
sid = item['SongID']
if sid not in unique:
unique[sid] = item
else:
# 比较分数,保留分数更高的
if item['Score'] > unique[sid]['Score']:
unique[sid] = item
return list(unique.values())
def similarity(a, b):
return SequenceMatcher(None, a, b).ratio()
def find_song_id(name_or_id):
songs = json.loads(open('alias.json','r',encoding='utf-8').read())['content']
name_lower = str(name_or_id).strip().lower()
# 模糊匹配(>= sim_num返回列表
fuzzy_matches = []
for song in songs:
sim_name = similarity(name_lower, song["Name"].lower())
if sim_name >= sim_num:
fuzzy_matches.append({
"Name": song["Name"],
"MatchedWith": song["Name"],
"SongID": song["SongID"],
"Score": sim_name
})
for alias in song["Alias"]:
sim_alias = similarity(name_lower, alias.lower())
if sim_alias >= sim_num:
fuzzy_matches.append({
"Name": song["Name"],
"MatchedWith": alias,
"SongID": song["SongID"],
"Score": sim_alias
})
sim_alias = similarity(str(song["SongID"]), name_lower)
if sim_alias >= 1:
fuzzy_matches.append({
"Name": song["Name"],
"MatchedWith": str(song["SongID"]),
"SongID": song["SongID"],
"Score": sim_alias
})
# 按相似度排序,返回列表或者空字典
fuzzy_matches.sort(key=lambda x: x["Score"], reverse=True)
result = deduplicate_by_songid(fuzzy_matches)
return {'length': len(fuzzy_matches), 'result': result}
# 测试调用
if __name__ == '__main__':
query = input("请输入搜索内容: ")
result = find_song_id(query)
print(result)

View File

@ -0,0 +1,330 @@
import random
import string
import threading
import time
from pathlib import Path
import shutil
from MaichartConverter import ma2tosimai
from ab2png import convert_ab_to_png
from acb2mp3 import convert_awb_to_wav, convert_wav_to_mp3
from config import work as work_dir_path
from search import search_music_by_id
from pv_decode import dat_to_mp4
from loguru import logger
from concurrent.futures import ThreadPoolExecutor, as_completed
from flask import Flask, request, send_file, jsonify, after_this_request, abort, render_template, send_from_directory
import os
from flask_cors import CORS, cross_origin
from findsong import find_song_id
app = Flask(__name__)
CORS(app)
# 假设你已实现以下函数
tasks = {}
tasks_lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=4)
def random_mid(length=16):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def process_audio(awb_path, output_dir: Path):
wav_path = output_dir / "temp.wav"
mp3_path = output_dir / "track.mp3"
convert_awb_to_wav(awb_path, wav_path)
convert_wav_to_mp3(wav_path, mp3_path)
if wav_path.exists():
wav_path.unlink() # 删除临时 wav 文件
def build_maidata_txt(
title: str = "",
freemsg: str = "",
bpm: str = "",
first_notes: dict = None, # {2: 0.123, 3: ..., ...}
levels: dict = None, # {2: "3", 3: "5", ...}
designers: dict = None, # {2: "作者A", 3: ..., ...}
charts: dict = None, # {2: "谱面数据\n...", ...}
levelnum: int = None,
) -> str:
# 限制 levelnum 在 [2, 6]
levelnum = max(2, min(levelnum, 6))
maidata = [
f"&title={title}",
f"&artist={freemsg}",
f"&wholebpm={bpm}",
"&first=0"
]
for i in range(2, levelnum + 2):
first = f"{first_notes.get(i):.3f}" if first_notes and i in first_notes else ""
maidata.append(f"&first_{i}={first}")
for i in range(2, levelnum + 2):
lv = levels.get(i, "") if levels else ""
des = designers.get(i, "") if designers else ""
maidata.append(f"&lv_{i}={lv}")
maidata.append(f"&des_{i}={des}")
for i in range(2, levelnum + 2):
chart = charts.get(i, "") if charts else ""
maidata.append(f"&inote_{i}=")
maidata.append(chart.strip())
maidata.append("&amsg_time=")
maidata.append("&amsg_content=")
return "\n".join(maidata)
def convert_to_simai_folder(result, output_folder):
from concurrent.futures import ThreadPoolExecutor
import zipfile
info = result[0]
music_id = info[0]
name = info[1]
artist = info[2]
designers = {i + 2: item["designer"] for i, item in enumerate(info[3])}
levels = {i + 2: item["levelshow"] for i, item in enumerate(info[3])}
ma2_list = result[1]
ab_file = result[2]
acb_list = result[3]
awb_file = next((f for f in acb_list if f.endswith('.awb')), None)
dat_file = result[4]
version_name = result[5]
if not all([name, artist, designers, levels, ma2_list, ab_file, awb_file, dat_file, version_name]):
logger.warning(f"[{music_id}] 信息不完整,跳过")
return None
work = Path(work_dir_path) # 如果之前是字符串
work.mkdir(parents=True, exist_ok=True) # 先创建 work 目录
work_dir = work / music_id
work_dir.mkdir(parents=True, exist_ok=True) # 再创建子目录
# 1. 并行执行数据转换
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
"ab": executor.submit(convert_ab_to_png, ab_file, work_dir / "bg.png"),
"dat": executor.submit(dat_to_mp4, dat_file, music_id),
"audio": executor.submit(process_audio, awb_file, work_dir)
}
mp4_path = futures["dat"].result()
audio_mp3 = work_dir / "track.mp3"
# 2. 处理谱面并生成 maidata.txt
convert_results = {}
for path in ma2_list:
filename = os.path.basename(path)
try:
num = int(filename[-6:-4]) # 提取 _00 → 0
level = num + 2
convert_results[level] = ma2tosimai(path)
except Exception as e:
logger.error(f"处理 {filename} 时出错: {e}")
maidata_txt = build_maidata_txt(
title=name,
freemsg=artist,
bpm="",
levels=levels,
designers=designers,
charts=convert_results,
levelnum=len(ma2_list),
)
with open(work_dir / "maidata.txt", "w", encoding="utf-8") as f:
f.write(maidata_txt)
# 3. 生成 zip 文件
zip_path = Path(output_folder) / f"{name}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file in ["bg.png", "maidata.txt", "track.mp3"]:
fpath = work_dir / file
if fpath.exists():
zipf.write(fpath, arcname=f"{name}/{file}")
else:
logger.warning(f"{fpath} 不存在,跳过")
if mp4_path and os.path.exists(mp4_path):
zipf.write(mp4_path, arcname=f"{name}/pv.mp4")
logger.success(f"[{music_id}] 打包完成:{zip_path}")
shutil.rmtree(work_dir)
if mp4_path and os.path.exists(mp4_path):
os.remove(mp4_path)
return zip_path
def task_runner(mid, music_id):
with tasks_lock:
tasks[mid]['status'] = 'CV' # 转换中
try:
music_data = search_music_by_id(music_id)
if music_data is None:
raise Exception(f"music_id {music_id} 未找到")
zip_path = convert_to_simai_folder(music_data, "result")
with tasks_lock:
tasks[mid]['status'] = 'OK'
tasks[mid]['zip_path'] = zip_path
except Exception as e:
with tasks_lock:
tasks[mid]['status'] = 'NG'
tasks[mid]['msg'] = str(e)
logger.error(f"任务 {mid} 失败: {e}")
raise e
@app.route('/UpsertConvert', methods=['POST'])
def upsert_convert():
data = request.get_json()
if not data or 'music_id' not in data:
return jsonify({"error": "缺少 music_id"}), 400
music_id = str(data['music_id'])
# 每次都分配新的 mid
mid = random_mid()
with tasks_lock:
tasks[mid] = {
'status': 'CV',
'music_id': music_id,
'zip_path': None,
'msg': None,
}
executor.submit(task_runner, mid, music_id)
return jsonify({"status": "success", "mid": mid})
@app.route('/GetConvertStatus', methods=['POST'])
def get_convert_status():
data = request.get_json()
if not data or 'mid' not in data:
return jsonify({"error": "缺少 mid"}), 400
mid = data['mid']
with tasks_lock:
info = tasks.get(mid)
if not info:
return jsonify({"error": "任务不存在"}), 404
status = info['status']
if status == 'CV':
# 模拟估算时间比如固定5秒
return jsonify({"status": "CV", "mid": mid})
elif status == 'OK':
return jsonify({"status": "OK", "mid": mid})
else:
# NG失败
return jsonify({"status": "NG", "mid": mid, "msg": info.get('msg', '')})
def delayed_delete(path, delay=300):
def delete_file():
time.sleep(delay)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"延迟 {delay}s 后已删除 zip 文件: {path}")
except Exception as e:
logger.error(f"延迟删除失败: {e}")
threading.Thread(target=delete_file, daemon=True).start()
@app.route('/GetConvertZip', methods=['POST', 'GET'])
def get_convert_zip():
mid = request.form.get('mid') or request.args.get('mid')
if not mid:
json_data = request.get_json(silent=True)
if json_data:
mid = json_data.get('mid')
if not mid:
return jsonify({"error": "缺少 mid 参数"}), 400
with tasks_lock:
info = tasks.get(mid)
if not info:
return jsonify({"error": "任务不存在"}), 404
if info['status'] == 'NG':
return jsonify({"error": "任务失败,无法下载"}), 500
if info['status'] != 'OK':
return jsonify({"error": "任务未完成"}), 400
zip_path = info.get('zip_path')
if not zip_path or not os.path.isfile(zip_path):
return jsonify({"error": "zip 文件不存在"}), 404
delayed_delete(zip_path, delay=300)
return send_file(zip_path, as_attachment=True, download_name=os.path.basename(zip_path))
@app.route('/')
def index():
#<h2>Mari我想开大运创似你</h2>
return """<h1>It works!</h1>
<!-- Mari我想开大运创似你 -->"""
@app.route('/search',methods=['GET'])
def search():
text = request.args.get('text')
return find_song_id(text)
@app.route('/covers/<int:N>.png')
def get_cover(N):
# 构建图像文件名
filename = f'{N}.png'
# 定义图像文件存储的目录
covers_dir = "/Users/bennett/LingtuBot/NaiiBot/stote/static/mai/cover"
# 检查文件是否存在
if os.path.exists(os.path.join(covers_dir, filename)):
# 返回图像文件
return send_from_directory(covers_dir, filename)
else:
# 如果文件不存在,返回 404 错误
abort(404)
if __name__ == '__main__':
if not os.path.exists("result"):
os.makedirs("result")
app.run(debug=True, host='0.0.0.0',port=16828)

View File

@ -51,21 +51,21 @@ def process_video(input_file, output_file):
target_size, pad_x, pad_y = calculate_padding(width, height)
if pad_x == 0 and pad_y == 0:
logger.info(f"视频比例是 {ratio_to_str(width,height)},无需填充。")
logger.info(f"视频比例是 {ratio_to_str(width, height)},无需填充。")
ffmpeg_cmd = [
"ffmpeg", "-i", input_file,
"ffmpeg", "-y", "-i", input_file,
"-c:v", "h264_videotoolbox",
"-b:v", "50M", "-maxrate", "70M", "-bufsize", "100M",
"-b:v", "5M", "-maxrate", "70M", "-bufsize", "100M",
"-c:a", "aac", "-b:a", "320k",
output_file
]
else:
logger.info(f"视频比例是 {ratio_to_str(width,height)},填充黑边,使视频变为 {target_size}x{target_size}")
logger.info(f"视频比例是 {ratio_to_str(width, height)},填充黑边,使视频变为 {target_size}x{target_size}")
ffmpeg_cmd = [
"ffmpeg", "-i", input_file,
"ffmpeg", "-y", "-i", input_file,
"-vf", f"pad={target_size}:{target_size}:{pad_x}:{pad_y}:black",
"-c:v", "h264_videotoolbox",
"-b:v", "50M", "-maxrate", "70M", "-bufsize", "100M",
"-b:v", "5M", "-maxrate", "70M", "-bufsize", "100M",
"-c:a", "aac", "-b:a", "320k",
output_file
]

View File

@ -1,8 +1,11 @@
import os
import subprocess
from pathlib import Path
from pv_convert import process_video
from loguru import logger
from pv_convert import process_video
from config import work as work_dirs
# === 配置 ===
WANNACRI_PATH = "wannacri" # 需在 PATH 或填写完整路径
FFMPEG_PATH = "ffmpeg" # 需在 PATH 或填写完整路径
@ -27,62 +30,62 @@ def get_video_duration(path: Path) -> float:
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(path)],
"-of", "default=nokey=1:noprint_wrappers=1", str(path)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
return float(result.stdout.strip())
except Exception as e:
print(f"获取视频时长失败: {e}")
logger.error(f"获取视频时长失败: {e}")
return 0.0
def dat_to_mp4(dat_file: str, id: str):
"""将 .dat 文件当作 .usm 文件处理,提取并转换为 .mp4"""
dat_path = Path(dat_file).resolve()
base_name = dat_path.stem
work_dir = Path("/Users/bennett/PJCK/CookiesChartConverter") / "work" / id
work_dir = Path(work_dirs) / id
usm_path = dat_path
ivf_dir = work_dir / "output" / f"{base_name}.dat" / "videos"
mp4_path = work_dir / f"{base_name}.mp4"
# Step 1: 提取 USM 内容
print(f"[1/3] 提取 USM 内容 ...")
logger.info(f"[1/3] 提取 USM 内容 ...")
extract_usm(usm_path, work_dir)
# Step 2: 找到第一个 .ivf 文件
ivf_files = list(ivf_dir.glob("*.ivf"))
if not ivf_files:
print(f"❌ 提取失败,未找到 .ivf 文件")
logger.error(f"❌ 提取失败,未找到 .ivf 文件")
return None
ivf_path = ivf_files[0]
print(f"[2/3] 转换为 MP4 ...")
logger.info(f"[2/3] 转换为 MP4 ...")
convert_ivf_to_mp4(ivf_path, mp4_path)
# Step 3: 检查视频时长
duration = get_video_duration(mp4_path)
if duration < 1.0:
print(f"⚠️ 视频时长 {duration:.2f}s 太短,跳过生成 pv.mp4")
logger.warning(f"⚠️ 视频时长 {duration:.2f}s 太短,跳过生成 pv.mp4")
return None
print(f"[3/3] 成功生成:{mp4_path}")
logger.info(f"[3/3] 成功生成:{mp4_path}")
process_video(mp4_path, work_dir / "pv.mp4")
return mp4_path
return work_dir / "pv.mp4"
# === 示例用法 ===
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("用法: python dat_to_mp4.py <xxx.dat>")
if len(sys.argv) != 3:
logger.info("用法: python dat_to_mp4.py <xxx.dat>")
exit(1)
dat_file = sys.argv[1]
mp4_output = dat_to_mp4(dat_file)
mp4_output = dat_to_mp4(dat_file,sys.argv[2])
if mp4_output:
converted_mp4_path = f"pv.mp4"
print(f"[4/4] 开始转换为新的 MP4 文件:{converted_mp4_path}")
logger.info(f"[4/4] 开始转换为新的 MP4 文件:{converted_mp4_path}")

View File

@ -3,9 +3,9 @@ import os
from loguru import logger
from ReadOpt import parse_music_xml, level_name
from config import streaming_assets
# 根目录
streaming_assets = "/Users/bennett/Downloads/SDEZ/Package/Sinmai_Data/StreamingAssets"
def search_music_by_id(search_id):
for asset_dir in os.listdir(streaming_assets):