CookiesChartConverter/getchart.py
2025-06-20 22:29:24 +08:00

330 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import string
import threading
import time
from pathlib import Path
import shutil
from MaichartConverter import ma2tosimai
from ab2png import convert_ab_to_png
from acb2mp3 import convert_awb_to_wav, convert_wav_to_mp3
from config import work as work_dir_path
from search import search_music_by_id
from pv_decode import dat_to_mp4
from loguru import logger
from concurrent.futures import ThreadPoolExecutor, as_completed
from flask import Flask, request, send_file, jsonify, after_this_request, abort, render_template, send_from_directory
import os
from flask_cors import CORS, cross_origin
from findsong import find_song_id
app = Flask(__name__)
CORS(app)
# 假设你已实现以下函数
tasks = {}
tasks_lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=4)
def random_mid(length=16):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def process_audio(awb_path, output_dir: Path):
wav_path = output_dir / "temp.wav"
mp3_path = output_dir / "track.mp3"
convert_awb_to_wav(awb_path, wav_path)
convert_wav_to_mp3(wav_path, mp3_path)
if wav_path.exists():
wav_path.unlink() # 删除临时 wav 文件
def build_maidata_txt(
title: str = "",
freemsg: str = "",
bpm: str = "",
first_notes: dict = None, # {2: 0.123, 3: ..., ...}
levels: dict = None, # {2: "3", 3: "5", ...}
designers: dict = None, # {2: "作者A", 3: ..., ...}
charts: dict = None, # {2: "谱面数据\n...", ...}
levelnum: int = None,
) -> str:
# 限制 levelnum 在 [2, 6]
levelnum = max(2, min(levelnum, 6))
maidata = [
f"&title={title}",
f"&artist={freemsg}",
f"&wholebpm={bpm}",
"&first=0"
]
for i in range(2, levelnum + 2):
first = f"{first_notes.get(i):.3f}" if first_notes and i in first_notes else ""
maidata.append(f"&first_{i}={first}")
for i in range(2, levelnum + 2):
lv = levels.get(i, "") if levels else ""
des = designers.get(i, "") if designers else ""
maidata.append(f"&lv_{i}={lv}")
maidata.append(f"&des_{i}={des}")
for i in range(2, levelnum + 2):
chart = charts.get(i, "") if charts else ""
maidata.append(f"&inote_{i}=")
maidata.append(chart.strip())
maidata.append("&amsg_time=")
maidata.append("&amsg_content=")
return "\n".join(maidata)
def convert_to_simai_folder(result, output_folder):
from concurrent.futures import ThreadPoolExecutor
import zipfile
info = result[0]
music_id = info[0]
name = info[1]
artist = info[2]
designers = {i + 2: item["designer"] for i, item in enumerate(info[3])}
levels = {i + 2: item["levelshow"] for i, item in enumerate(info[3])}
ma2_list = result[1]
ab_file = result[2]
acb_list = result[3]
awb_file = next((f for f in acb_list if f.endswith('.awb')), None)
dat_file = result[4]
version_name = result[5]
if not all([name, artist, designers, levels, ma2_list, ab_file, awb_file, dat_file, version_name]):
logger.warning(f"[{music_id}] 信息不完整,跳过")
return None
work = Path(work_dir_path) # 如果之前是字符串
work.mkdir(parents=True, exist_ok=True) # 先创建 work 目录
work_dir = work / music_id
work_dir.mkdir(parents=True, exist_ok=True) # 再创建子目录
# 1. 并行执行数据转换
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
"ab": executor.submit(convert_ab_to_png, ab_file, work_dir / "bg.png"),
"dat": executor.submit(dat_to_mp4, dat_file, music_id),
"audio": executor.submit(process_audio, awb_file, work_dir)
}
mp4_path = futures["dat"].result()
audio_mp3 = work_dir / "track.mp3"
# 2. 处理谱面并生成 maidata.txt
convert_results = {}
for path in ma2_list:
filename = os.path.basename(path)
try:
num = int(filename[-6:-4]) # 提取 _00 → 0
level = num + 2
convert_results[level] = ma2tosimai(path)
except Exception as e:
logger.error(f"处理 {filename} 时出错: {e}")
maidata_txt = build_maidata_txt(
title=name,
freemsg=artist,
bpm="",
levels=levels,
designers=designers,
charts=convert_results,
levelnum=len(ma2_list),
)
with open(work_dir / "maidata.txt", "w", encoding="utf-8") as f:
f.write(maidata_txt)
# 3. 生成 zip 文件
zip_path = Path(output_folder) / f"{name}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file in ["bg.png", "maidata.txt", "track.mp3"]:
fpath = work_dir / file
if fpath.exists():
zipf.write(fpath, arcname=f"{name}/{file}")
else:
logger.warning(f"{fpath} 不存在,跳过")
if mp4_path and os.path.exists(mp4_path):
zipf.write(mp4_path, arcname=f"{name}/pv.mp4")
logger.success(f"[{music_id}] 打包完成:{zip_path}")
shutil.rmtree(work_dir)
if mp4_path and os.path.exists(mp4_path):
os.remove(mp4_path)
return zip_path
def task_runner(mid, music_id):
with tasks_lock:
tasks[mid]['status'] = 'CV' # 转换中
try:
music_data = search_music_by_id(music_id)
if music_data is None:
raise Exception(f"music_id {music_id} 未找到")
zip_path = convert_to_simai_folder(music_data, "result")
with tasks_lock:
tasks[mid]['status'] = 'OK'
tasks[mid]['zip_path'] = zip_path
except Exception as e:
with tasks_lock:
tasks[mid]['status'] = 'NG'
tasks[mid]['msg'] = str(e)
logger.error(f"任务 {mid} 失败: {e}")
raise e
@app.route('/UpsertConvert', methods=['POST'])
def upsert_convert():
data = request.get_json()
if not data or 'music_id' not in data:
return jsonify({"error": "缺少 music_id"}), 400
music_id = str(data['music_id'])
# 每次都分配新的 mid
mid = random_mid()
with tasks_lock:
tasks[mid] = {
'status': 'CV',
'music_id': music_id,
'zip_path': None,
'msg': None,
}
executor.submit(task_runner, mid, music_id)
return jsonify({"status": "success", "mid": mid})
@app.route('/GetConvertStatus', methods=['POST'])
def get_convert_status():
data = request.get_json()
if not data or 'mid' not in data:
return jsonify({"error": "缺少 mid"}), 400
mid = data['mid']
with tasks_lock:
info = tasks.get(mid)
if not info:
return jsonify({"error": "任务不存在"}), 404
status = info['status']
if status == 'CV':
# 模拟估算时间比如固定5秒
return jsonify({"status": "CV", "mid": mid})
elif status == 'OK':
return jsonify({"status": "OK", "mid": mid})
else:
# NG失败
return jsonify({"status": "NG", "mid": mid, "msg": info.get('msg', '')})
def delayed_delete(path, delay=300):
def delete_file():
time.sleep(delay)
if os.path.exists(path):
try:
os.remove(path)
logger.info(f"延迟 {delay}s 后已删除 zip 文件: {path}")
except Exception as e:
logger.error(f"延迟删除失败: {e}")
threading.Thread(target=delete_file, daemon=True).start()
@app.route('/GetConvertZip', methods=['POST', 'GET'])
def get_convert_zip():
mid = request.form.get('mid') or request.args.get('mid')
if not mid:
json_data = request.get_json(silent=True)
if json_data:
mid = json_data.get('mid')
if not mid:
return jsonify({"error": "缺少 mid 参数"}), 400
with tasks_lock:
info = tasks.get(mid)
if not info:
return jsonify({"error": "任务不存在"}), 404
if info['status'] == 'NG':
return jsonify({"error": "任务失败,无法下载"}), 500
if info['status'] != 'OK':
return jsonify({"error": "任务未完成"}), 400
zip_path = info.get('zip_path')
if not zip_path or not os.path.isfile(zip_path):
return jsonify({"error": "zip 文件不存在"}), 404
delayed_delete(zip_path, delay=300)
return send_file(zip_path, as_attachment=True, download_name=os.path.basename(zip_path))
@app.route('/')
def index():
#<h2>Mari我想开大运创似你</h2>
return """<h1>It works!</h1>
<!-- Mari我想开大运创似你 -->"""
@app.route('/search',methods=['GET'])
def search():
text = request.args.get('text')
return find_song_id(text)
@app.route('/covers/<int:N>.png')
def get_cover(N):
# 构建图像文件名
filename = f'{N}.png'
# 定义图像文件存储的目录
covers_dir = "/Users/bennett/LingtuBot/NaiiBot/stote/static/mai/cover"
# 检查文件是否存在
if os.path.exists(os.path.join(covers_dir, filename)):
# 返回图像文件
return send_from_directory(covers_dir, filename)
else:
# 如果文件不存在,返回 404 错误
abort(404)
if __name__ == '__main__':
if not os.path.exists("result"):
os.makedirs("result")
app.run(debug=True, host='0.0.0.0',port=16828)