mirror of
				https://gitea.cookies.4d.ink/Cookies/CookiesChartConverter.git
				synced 2025-10-26 03:02:39 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			330 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			330 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | ||
| import random
 | ||
| import string
 | ||
| import threading
 | ||
| import time
 | ||
| from pathlib import Path
 | ||
| import shutil
 | ||
| 
 | ||
| from MaichartConverter import ma2tosimai
 | ||
| from ab2png import convert_ab_to_png
 | ||
| from acb2mp3 import convert_awb_to_wav, convert_wav_to_mp3
 | ||
| from config import work as work_dir_path
 | ||
| from search import search_music_by_id
 | ||
| from pv_decode import dat_to_mp4
 | ||
| from loguru import logger
 | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed
 | ||
| from flask import Flask, request, send_file, jsonify, after_this_request, abort, render_template, send_from_directory
 | ||
| import os
 | ||
| from flask_cors import CORS, cross_origin
 | ||
| from findsong import find_song_id
 | ||
| 
 | ||
| app = Flask(__name__)
 | ||
| CORS(app)
 | ||
| # 假设你已实现以下函数
 | ||
| tasks = {}
 | ||
| tasks_lock = threading.Lock()
 | ||
| executor = ThreadPoolExecutor(max_workers=4)
 | ||
| 
 | ||
| 
 | ||
| def random_mid(length=16):
 | ||
|     return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
 | ||
| 
 | ||
| def process_audio(awb_path, output_dir: Path):
 | ||
|     wav_path = output_dir / "temp.wav"
 | ||
|     mp3_path = output_dir / "track.mp3"
 | ||
|     convert_awb_to_wav(awb_path, wav_path)
 | ||
|     convert_wav_to_mp3(wav_path, mp3_path)
 | ||
|     if wav_path.exists():
 | ||
|         wav_path.unlink()  # 删除临时 wav 文件
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def build_maidata_txt(
 | ||
|     title: str = "",
 | ||
|     freemsg: str = "",
 | ||
|     bpm: str = "",
 | ||
|     first_notes: dict = None,  # {2: 0.123, 3: ..., ...}
 | ||
|     levels: dict = None,       # {2: "3", 3: "5", ...}
 | ||
|     designers: dict = None,    # {2: "作者A", 3: ..., ...}
 | ||
|     charts: dict = None,       # {2: "谱面数据\n...", ...}
 | ||
|     levelnum: int = None,
 | ||
| ) -> str:
 | ||
|     # 限制 levelnum 在 [2, 6]
 | ||
|     levelnum = max(2, min(levelnum, 6))
 | ||
| 
 | ||
|     maidata = [
 | ||
|         f"&title={title}",
 | ||
|         f"&artist={freemsg}",
 | ||
|         f"&wholebpm={bpm}",
 | ||
|         "&first=0"
 | ||
|     ]
 | ||
| 
 | ||
|     for i in range(2, levelnum + 2):
 | ||
|         first = f"{first_notes.get(i):.3f}" if first_notes and i in first_notes else ""
 | ||
|         maidata.append(f"&first_{i}={first}")
 | ||
| 
 | ||
|     for i in range(2, levelnum + 2):
 | ||
|         lv = levels.get(i, "") if levels else ""
 | ||
|         des = designers.get(i, "") if designers else ""
 | ||
|         maidata.append(f"&lv_{i}={lv}")
 | ||
|         maidata.append(f"&des_{i}={des}")
 | ||
| 
 | ||
|     for i in range(2, levelnum + 2):
 | ||
|         chart = charts.get(i, "") if charts else ""
 | ||
|         maidata.append(f"&inote_{i}=")
 | ||
|         maidata.append(chart.strip())
 | ||
| 
 | ||
|     maidata.append("&amsg_time=")
 | ||
|     maidata.append("&amsg_content=")
 | ||
| 
 | ||
|     return "\n".join(maidata)
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def convert_to_simai_folder(result, output_folder):
 | ||
|     from concurrent.futures import ThreadPoolExecutor
 | ||
|     import zipfile
 | ||
| 
 | ||
|     info = result[0]
 | ||
|     music_id = info[0]
 | ||
|     name = info[1]
 | ||
|     artist = info[2]
 | ||
|     designers = {i + 2: item["designer"] for i, item in enumerate(info[3])}
 | ||
|     levels = {i + 2: item["levelshow"] for i, item in enumerate(info[3])}
 | ||
|     ma2_list = result[1]
 | ||
|     ab_file = result[2]
 | ||
|     acb_list = result[3]
 | ||
|     awb_file = next((f for f in acb_list if f.endswith('.awb')), None)
 | ||
|     dat_file = result[4]
 | ||
|     version_name = result[5]
 | ||
| 
 | ||
|     if not all([name, artist, designers, levels, ma2_list, ab_file, awb_file, dat_file, version_name]):
 | ||
|         logger.warning(f"[{music_id}] 信息不完整,跳过")
 | ||
|         return None
 | ||
| 
 | ||
|     work = Path(work_dir_path)  # 如果之前是字符串
 | ||
|     work.mkdir(parents=True, exist_ok=True)  # 先创建 work 目录
 | ||
| 
 | ||
|     work_dir = work / music_id
 | ||
|     work_dir.mkdir(parents=True, exist_ok=True)  # 再创建子目录
 | ||
| 
 | ||
|     # 1. 并行执行数据转换
 | ||
|     with ThreadPoolExecutor(max_workers=3) as executor:
 | ||
|         futures = {
 | ||
|             "ab": executor.submit(convert_ab_to_png, ab_file, work_dir / "bg.png"),
 | ||
|             "dat": executor.submit(dat_to_mp4, dat_file, music_id),
 | ||
|             "audio": executor.submit(process_audio, awb_file, work_dir)
 | ||
|         }
 | ||
| 
 | ||
|         mp4_path = futures["dat"].result()
 | ||
|         audio_mp3 = work_dir / "track.mp3"
 | ||
| 
 | ||
|     # 2. 处理谱面并生成 maidata.txt
 | ||
|     convert_results = {}
 | ||
|     for path in ma2_list:
 | ||
|         filename = os.path.basename(path)
 | ||
|         try:
 | ||
|             num = int(filename[-6:-4])  # 提取 _00 → 0
 | ||
|             level = num + 2
 | ||
|             convert_results[level] = ma2tosimai(path)
 | ||
|         except Exception as e:
 | ||
|             logger.error(f"处理 {filename} 时出错: {e}")
 | ||
| 
 | ||
|     maidata_txt = build_maidata_txt(
 | ||
|         title=name,
 | ||
|         freemsg=artist,
 | ||
|         bpm="",
 | ||
|         levels=levels,
 | ||
|         designers=designers,
 | ||
|         charts=convert_results,
 | ||
|         levelnum=len(ma2_list),
 | ||
|     )
 | ||
| 
 | ||
|     with open(work_dir / "maidata.txt", "w", encoding="utf-8") as f:
 | ||
|         f.write(maidata_txt)
 | ||
| 
 | ||
|     # 3. 生成 zip 文件
 | ||
|     zip_path = Path(output_folder) / f"{name}.zip"
 | ||
|     with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
 | ||
|         for file in ["bg.png", "maidata.txt", "track.mp3"]:
 | ||
|             fpath = work_dir / file
 | ||
|             if fpath.exists():
 | ||
|                 zipf.write(fpath, arcname=f"{name}/{file}")
 | ||
|             else:
 | ||
|                 logger.warning(f"{fpath} 不存在,跳过")
 | ||
| 
 | ||
|         if mp4_path and os.path.exists(mp4_path):
 | ||
|             zipf.write(mp4_path, arcname=f"{name}/pv.mp4")
 | ||
| 
 | ||
|     logger.success(f"[{music_id}] 打包完成:{zip_path}")
 | ||
| 
 | ||
|     shutil.rmtree(work_dir)
 | ||
|     if mp4_path and os.path.exists(mp4_path):
 | ||
|         os.remove(mp4_path)
 | ||
| 
 | ||
|     return zip_path
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def task_runner(mid, music_id):
 | ||
|     with tasks_lock:
 | ||
|         tasks[mid]['status'] = 'CV'  # 转换中
 | ||
| 
 | ||
|     try:
 | ||
|         music_data = search_music_by_id(music_id)
 | ||
|         if music_data is None:
 | ||
|             raise Exception(f"music_id {music_id} 未找到")
 | ||
| 
 | ||
|         zip_path = convert_to_simai_folder(music_data, "result")
 | ||
| 
 | ||
|         with tasks_lock:
 | ||
|             tasks[mid]['status'] = 'OK'
 | ||
|             tasks[mid]['zip_path'] = zip_path
 | ||
| 
 | ||
|     except Exception as e:
 | ||
|         with tasks_lock:
 | ||
|             tasks[mid]['status'] = 'NG'
 | ||
|             tasks[mid]['msg'] = str(e)
 | ||
|         logger.error(f"任务 {mid} 失败: {e}")
 | ||
|         raise e
 | ||
| 
 | ||
| @app.route('/UpsertConvert', methods=['POST'])
 | ||
| def upsert_convert():
 | ||
|     data = request.get_json()
 | ||
|     if not data or 'music_id' not in data:
 | ||
|         return jsonify({"error": "缺少 music_id"}), 400
 | ||
| 
 | ||
|     music_id = str(data['music_id'])
 | ||
| 
 | ||
|     # 每次都分配新的 mid
 | ||
|     mid = random_mid()
 | ||
|     with tasks_lock:
 | ||
|         tasks[mid] = {
 | ||
|             'status': 'CV',
 | ||
|             'music_id': music_id,
 | ||
|             'zip_path': None,
 | ||
|             'msg': None,
 | ||
|         }
 | ||
| 
 | ||
|     executor.submit(task_runner, mid, music_id)
 | ||
| 
 | ||
|     return jsonify({"status": "success", "mid": mid})
 | ||
| 
 | ||
| 
 | ||
| @app.route('/GetConvertStatus', methods=['POST'])
 | ||
| def get_convert_status():
 | ||
|     data = request.get_json()
 | ||
|     if not data or 'mid' not in data:
 | ||
|         return jsonify({"error": "缺少 mid"}), 400
 | ||
| 
 | ||
|     mid = data['mid']
 | ||
|     with tasks_lock:
 | ||
|         info = tasks.get(mid)
 | ||
| 
 | ||
|     if not info:
 | ||
|         return jsonify({"error": "任务不存在"}), 404
 | ||
| 
 | ||
|     status = info['status']
 | ||
|     if status == 'CV':
 | ||
|         # 模拟估算时间,比如固定5秒
 | ||
|         return jsonify({"status": "CV", "mid": mid})
 | ||
|     elif status == 'OK':
 | ||
|         return jsonify({"status": "OK", "mid": mid})
 | ||
|     else:
 | ||
|         # NG失败
 | ||
|         return jsonify({"status": "NG", "mid": mid, "msg": info.get('msg', '')})
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| def delayed_delete(path, delay=300):
 | ||
|     def delete_file():
 | ||
|         time.sleep(delay)
 | ||
|         if os.path.exists(path):
 | ||
|             try:
 | ||
|                 os.remove(path)
 | ||
|                 logger.info(f"延迟 {delay}s 后已删除 zip 文件: {path}")
 | ||
|             except Exception as e:
 | ||
|                 logger.error(f"延迟删除失败: {e}")
 | ||
|     threading.Thread(target=delete_file, daemon=True).start()
 | ||
| 
 | ||
| @app.route('/GetConvertZip', methods=['POST', 'GET'])
 | ||
| def get_convert_zip():
 | ||
|     mid = request.form.get('mid') or request.args.get('mid')
 | ||
|     if not mid:
 | ||
|         json_data = request.get_json(silent=True)
 | ||
|         if json_data:
 | ||
|             mid = json_data.get('mid')
 | ||
| 
 | ||
|     if not mid:
 | ||
|         return jsonify({"error": "缺少 mid 参数"}), 400
 | ||
| 
 | ||
|     with tasks_lock:
 | ||
|         info = tasks.get(mid)
 | ||
| 
 | ||
|     if not info:
 | ||
|         return jsonify({"error": "任务不存在"}), 404
 | ||
| 
 | ||
|     if info['status'] == 'NG':
 | ||
|         return jsonify({"error": "任务失败,无法下载"}), 500
 | ||
| 
 | ||
|     if info['status'] != 'OK':
 | ||
|         return jsonify({"error": "任务未完成"}), 400
 | ||
| 
 | ||
|     zip_path = info.get('zip_path')
 | ||
|     if not zip_path or not os.path.isfile(zip_path):
 | ||
|         return jsonify({"error": "zip 文件不存在"}), 404
 | ||
| 
 | ||
|     delayed_delete(zip_path, delay=300)
 | ||
|     return send_file(zip_path, as_attachment=True, download_name=os.path.basename(zip_path))
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| @app.route('/')
 | ||
| def index():
 | ||
|     #<h2>Mari我想开大运创似你</h2>
 | ||
|     return """<h1>It works!</h1>
 | ||
|     <!-- Mari我想开大运创似你 -->"""
 | ||
| 
 | ||
| @app.route('/search',methods=['GET'])
 | ||
| def search():
 | ||
|     text = request.args.get('text')
 | ||
|     return find_song_id(text)
 | ||
| 
 | ||
| @app.route('/covers/<int:N>.png')
 | ||
| def get_cover(N):
 | ||
|     # 构建图像文件名
 | ||
|     filename = f'{N}.png'
 | ||
|     # 定义图像文件存储的目录
 | ||
|     covers_dir = "/Users/bennett/LingtuBot/NaiiBot/stote/static/mai/cover"
 | ||
|     # 检查文件是否存在
 | ||
|     if os.path.exists(os.path.join(covers_dir, filename)):
 | ||
|         # 返回图像文件
 | ||
|         return send_from_directory(covers_dir, filename)
 | ||
|     else:
 | ||
|         # 如果文件不存在,返回 404 错误
 | ||
|         abort(404)
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| if __name__ == '__main__':
 | ||
|     if not os.path.exists("result"):
 | ||
|         os.makedirs("result")
 | ||
|     app.run(debug=True, host='0.0.0.0',port=16828) |