mirror of
https://gitea.cookies.4d.ink/Cookies/CookiesChartConverter.git
synced 2025-10-26 03:02:39 +00:00
330 lines
9.2 KiB
Python
330 lines
9.2 KiB
Python
|
||
import random
|
||
import string
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
import shutil
|
||
|
||
from MaichartConverter import ma2tosimai
|
||
from ab2png import convert_ab_to_png
|
||
from acb2mp3 import convert_awb_to_wav, convert_wav_to_mp3
|
||
from config import work as work_dir_path
|
||
from search import search_music_by_id
|
||
from pv_decode import dat_to_mp4
|
||
from loguru import logger
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from flask import Flask, request, send_file, jsonify, after_this_request, abort, render_template, send_from_directory
|
||
import os
|
||
from flask_cors import CORS, cross_origin
|
||
from findsong import find_song_id
|
||
|
||
app = Flask(__name__)
|
||
CORS(app)
|
||
# 假设你已实现以下函数
|
||
tasks = {}
|
||
tasks_lock = threading.Lock()
|
||
executor = ThreadPoolExecutor(max_workers=4)
|
||
|
||
|
||
def random_mid(length=16):
|
||
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
|
||
|
||
def process_audio(awb_path, output_dir: Path):
|
||
wav_path = output_dir / "temp.wav"
|
||
mp3_path = output_dir / "track.mp3"
|
||
convert_awb_to_wav(awb_path, wav_path)
|
||
convert_wav_to_mp3(wav_path, mp3_path)
|
||
if wav_path.exists():
|
||
wav_path.unlink() # 删除临时 wav 文件
|
||
|
||
|
||
|
||
def build_maidata_txt(
|
||
title: str = "",
|
||
freemsg: str = "",
|
||
bpm: str = "",
|
||
first_notes: dict = None, # {2: 0.123, 3: ..., ...}
|
||
levels: dict = None, # {2: "3", 3: "5", ...}
|
||
designers: dict = None, # {2: "作者A", 3: ..., ...}
|
||
charts: dict = None, # {2: "谱面数据\n...", ...}
|
||
levelnum: int = None,
|
||
) -> str:
|
||
# 限制 levelnum 在 [2, 6]
|
||
levelnum = max(2, min(levelnum, 6))
|
||
|
||
maidata = [
|
||
f"&title={title}",
|
||
f"&artist={freemsg}",
|
||
f"&wholebpm={bpm}",
|
||
"&first=0"
|
||
]
|
||
|
||
for i in range(2, levelnum + 2):
|
||
first = f"{first_notes.get(i):.3f}" if first_notes and i in first_notes else ""
|
||
maidata.append(f"&first_{i}={first}")
|
||
|
||
for i in range(2, levelnum + 2):
|
||
lv = levels.get(i, "") if levels else ""
|
||
des = designers.get(i, "") if designers else ""
|
||
maidata.append(f"&lv_{i}={lv}")
|
||
maidata.append(f"&des_{i}={des}")
|
||
|
||
for i in range(2, levelnum + 2):
|
||
chart = charts.get(i, "") if charts else ""
|
||
maidata.append(f"&inote_{i}=")
|
||
maidata.append(chart.strip())
|
||
|
||
maidata.append("&amsg_time=")
|
||
maidata.append("&amsg_content=")
|
||
|
||
return "\n".join(maidata)
|
||
|
||
|
||
|
||
def convert_to_simai_folder(result, output_folder):
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import zipfile
|
||
|
||
info = result[0]
|
||
music_id = info[0]
|
||
name = info[1]
|
||
artist = info[2]
|
||
designers = {i + 2: item["designer"] for i, item in enumerate(info[3])}
|
||
levels = {i + 2: item["levelshow"] for i, item in enumerate(info[3])}
|
||
ma2_list = result[1]
|
||
ab_file = result[2]
|
||
acb_list = result[3]
|
||
awb_file = next((f for f in acb_list if f.endswith('.awb')), None)
|
||
dat_file = result[4]
|
||
version_name = result[5]
|
||
|
||
if not all([name, artist, designers, levels, ma2_list, ab_file, awb_file, dat_file, version_name]):
|
||
logger.warning(f"[{music_id}] 信息不完整,跳过")
|
||
return None
|
||
|
||
work = Path(work_dir_path) # 如果之前是字符串
|
||
work.mkdir(parents=True, exist_ok=True) # 先创建 work 目录
|
||
|
||
work_dir = work / music_id
|
||
work_dir.mkdir(parents=True, exist_ok=True) # 再创建子目录
|
||
|
||
# 1. 并行执行数据转换
|
||
with ThreadPoolExecutor(max_workers=3) as executor:
|
||
futures = {
|
||
"ab": executor.submit(convert_ab_to_png, ab_file, work_dir / "bg.png"),
|
||
"dat": executor.submit(dat_to_mp4, dat_file, music_id),
|
||
"audio": executor.submit(process_audio, awb_file, work_dir)
|
||
}
|
||
|
||
mp4_path = futures["dat"].result()
|
||
audio_mp3 = work_dir / "track.mp3"
|
||
|
||
# 2. 处理谱面并生成 maidata.txt
|
||
convert_results = {}
|
||
for path in ma2_list:
|
||
filename = os.path.basename(path)
|
||
try:
|
||
num = int(filename[-6:-4]) # 提取 _00 → 0
|
||
level = num + 2
|
||
convert_results[level] = ma2tosimai(path)
|
||
except Exception as e:
|
||
logger.error(f"处理 {filename} 时出错: {e}")
|
||
|
||
maidata_txt = build_maidata_txt(
|
||
title=name,
|
||
freemsg=artist,
|
||
bpm="",
|
||
levels=levels,
|
||
designers=designers,
|
||
charts=convert_results,
|
||
levelnum=len(ma2_list),
|
||
)
|
||
|
||
with open(work_dir / "maidata.txt", "w", encoding="utf-8") as f:
|
||
f.write(maidata_txt)
|
||
|
||
# 3. 生成 zip 文件
|
||
zip_path = Path(output_folder) / f"{name}.zip"
|
||
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
||
for file in ["bg.png", "maidata.txt", "track.mp3"]:
|
||
fpath = work_dir / file
|
||
if fpath.exists():
|
||
zipf.write(fpath, arcname=f"{name}/{file}")
|
||
else:
|
||
logger.warning(f"{fpath} 不存在,跳过")
|
||
|
||
if mp4_path and os.path.exists(mp4_path):
|
||
zipf.write(mp4_path, arcname=f"{name}/pv.mp4")
|
||
|
||
logger.success(f"[{music_id}] 打包完成:{zip_path}")
|
||
|
||
shutil.rmtree(work_dir)
|
||
if mp4_path and os.path.exists(mp4_path):
|
||
os.remove(mp4_path)
|
||
|
||
return zip_path
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
def task_runner(mid, music_id):
|
||
with tasks_lock:
|
||
tasks[mid]['status'] = 'CV' # 转换中
|
||
|
||
try:
|
||
music_data = search_music_by_id(music_id)
|
||
if music_data is None:
|
||
raise Exception(f"music_id {music_id} 未找到")
|
||
|
||
zip_path = convert_to_simai_folder(music_data, "result")
|
||
|
||
with tasks_lock:
|
||
tasks[mid]['status'] = 'OK'
|
||
tasks[mid]['zip_path'] = zip_path
|
||
|
||
except Exception as e:
|
||
with tasks_lock:
|
||
tasks[mid]['status'] = 'NG'
|
||
tasks[mid]['msg'] = str(e)
|
||
logger.error(f"任务 {mid} 失败: {e}")
|
||
raise e
|
||
|
||
@app.route('/UpsertConvert', methods=['POST'])
|
||
def upsert_convert():
|
||
data = request.get_json()
|
||
if not data or 'music_id' not in data:
|
||
return jsonify({"error": "缺少 music_id"}), 400
|
||
|
||
music_id = str(data['music_id'])
|
||
|
||
# 每次都分配新的 mid
|
||
mid = random_mid()
|
||
with tasks_lock:
|
||
tasks[mid] = {
|
||
'status': 'CV',
|
||
'music_id': music_id,
|
||
'zip_path': None,
|
||
'msg': None,
|
||
}
|
||
|
||
executor.submit(task_runner, mid, music_id)
|
||
|
||
return jsonify({"status": "success", "mid": mid})
|
||
|
||
|
||
@app.route('/GetConvertStatus', methods=['POST'])
|
||
def get_convert_status():
|
||
data = request.get_json()
|
||
if not data or 'mid' not in data:
|
||
return jsonify({"error": "缺少 mid"}), 400
|
||
|
||
mid = data['mid']
|
||
with tasks_lock:
|
||
info = tasks.get(mid)
|
||
|
||
if not info:
|
||
return jsonify({"error": "任务不存在"}), 404
|
||
|
||
status = info['status']
|
||
if status == 'CV':
|
||
# 模拟估算时间,比如固定5秒
|
||
return jsonify({"status": "CV", "mid": mid})
|
||
elif status == 'OK':
|
||
return jsonify({"status": "OK", "mid": mid})
|
||
else:
|
||
# NG失败
|
||
return jsonify({"status": "NG", "mid": mid, "msg": info.get('msg', '')})
|
||
|
||
|
||
|
||
def delayed_delete(path, delay=300):
|
||
def delete_file():
|
||
time.sleep(delay)
|
||
if os.path.exists(path):
|
||
try:
|
||
os.remove(path)
|
||
logger.info(f"延迟 {delay}s 后已删除 zip 文件: {path}")
|
||
except Exception as e:
|
||
logger.error(f"延迟删除失败: {e}")
|
||
threading.Thread(target=delete_file, daemon=True).start()
|
||
|
||
@app.route('/GetConvertZip', methods=['POST', 'GET'])
|
||
def get_convert_zip():
|
||
mid = request.form.get('mid') or request.args.get('mid')
|
||
if not mid:
|
||
json_data = request.get_json(silent=True)
|
||
if json_data:
|
||
mid = json_data.get('mid')
|
||
|
||
if not mid:
|
||
return jsonify({"error": "缺少 mid 参数"}), 400
|
||
|
||
with tasks_lock:
|
||
info = tasks.get(mid)
|
||
|
||
if not info:
|
||
return jsonify({"error": "任务不存在"}), 404
|
||
|
||
if info['status'] == 'NG':
|
||
return jsonify({"error": "任务失败,无法下载"}), 500
|
||
|
||
if info['status'] != 'OK':
|
||
return jsonify({"error": "任务未完成"}), 400
|
||
|
||
zip_path = info.get('zip_path')
|
||
if not zip_path or not os.path.isfile(zip_path):
|
||
return jsonify({"error": "zip 文件不存在"}), 404
|
||
|
||
delayed_delete(zip_path, delay=300)
|
||
return send_file(zip_path, as_attachment=True, download_name=os.path.basename(zip_path))
|
||
|
||
|
||
|
||
|
||
@app.route('/')
|
||
def index():
|
||
#<h2>Mari我想开大运创似你</h2>
|
||
return """<h1>It works!</h1>
|
||
<!-- Mari我想开大运创似你 -->"""
|
||
|
||
@app.route('/search',methods=['GET'])
|
||
def search():
|
||
text = request.args.get('text')
|
||
return find_song_id(text)
|
||
|
||
@app.route('/covers/<int:N>.png')
|
||
def get_cover(N):
|
||
# 构建图像文件名
|
||
filename = f'{N}.png'
|
||
# 定义图像文件存储的目录
|
||
covers_dir = "/Users/bennett/LingtuBot/NaiiBot/stote/static/mai/cover"
|
||
# 检查文件是否存在
|
||
if os.path.exists(os.path.join(covers_dir, filename)):
|
||
# 返回图像文件
|
||
return send_from_directory(covers_dir, filename)
|
||
else:
|
||
# 如果文件不存在,返回 404 错误
|
||
abort(404)
|
||
|
||
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
if not os.path.exists("result"):
|
||
os.makedirs("result")
|
||
app.run(debug=True, host='0.0.0.0',port=16828) |