import os from pathlib import Path import shutil from MaichartConverter import ma2tosimai from ab2png import convert_ab_to_png from acb2mp3 import convert_awb_to_wav, convert_wav_to_mp3 from search import search_music_by_id from pv_decode import dat_to_mp4 from loguru import logger from concurrent.futures import ThreadPoolExecutor, as_completed # 假设你已实现以下函数 def build_maidata_txt( title: str = "", freemsg: str = "", bpm: str = "", first_notes: dict = None, # {2: 0.123, 3: ..., ...} levels: dict = None, # {2: "3", 3: "5", ...} designers: dict = None, # {2: "作者A", 3: ..., ...} charts: dict = None, # {2: "谱面数据\n...", ...} levelnum: int = None, ) -> str: # 限制 levelnum 在 [2, 6] levelnum = max(2, min(levelnum, 6)) maidata = [ f"&title={title}", f"&artist={freemsg}", f"&wholebpm={bpm}", "&first=0" ] for i in range(2, levelnum + 2): first = f"{first_notes.get(i):.3f}" if first_notes and i in first_notes else "" maidata.append(f"&first_{i}={first}") for i in range(2, levelnum + 2): lv = levels.get(i, "") if levels else "" des = designers.get(i, "") if designers else "" maidata.append(f"&lv_{i}={lv}") maidata.append(f"&des_{i}={des}") for i in range(2, levelnum + 2): chart = charts.get(i, "") if charts else "" maidata.append(f"&inote_{i}=") maidata.append(chart.strip()) maidata.append("&amsg_time=") maidata.append("&amsg_content=") return "\n".join(maidata) def convert_to_simai_folder(result,output_folder): npof = output_folder output_folder = Path(output_folder) info = result[0] id = info[0] of = os.path.join("work", id) if not os.path.exists(of): os.makedirs(of) name = info[1] artist = info[2] designers = {i + 2: item["designer"] for i, item in enumerate(info[3])} levels = {i + 2: item["levelshow"] for i, item in enumerate(info[3])} ma2_list = result[1] ab_file = result[2] acb_list = result[3] awb_file = next((f for f in acb_list if f.endswith('.awb')), None) dat_file = result[4] versionname = result[5] if name is None or artist is None or designers is [] or levels is [] or ma2_list is [] or acb_list is [] or awb_file is None or dat_file is None or versionname is None: return None convert_results = {} for path in ma2_list: filename = os.path.basename(path) try: num = int(filename[-6:-4]) # 提取 _00 → 0 level = num + 2 # 转换为 Simai 难度等级 convert_results[level] = ma2tosimai(path) except Exception as e: logger.error(f"处理 {filename} 时出错: {e}") convert_awb_to_wav(awb_file,f"work/{id}/temp.wav") convert_wav_to_mp3(f"work/{id}/temp.wav",f"work/{id}/track.mp3") convert_ab_to_png(ab_file,f"work/{id}/bg.png") pv_path = dat_to_mp4(dat_file,id) maidata_txt = build_maidata_txt(title=name,levels=levels,designers=designers,charts=convert_results,levelnum=len(ma2_list),freemsg=artist) with open(f"work/{id}/maidata.txt", "w") as f: f.write(maidata_txt) source_folder = of # 目标文件夹路径 target_folder = f"{npof}/{versionname}/{name}" # 要复制的文件列表 files_to_copy = ["bg.png", "maidata.txt", "pv.mp4", "track.mp3"] # 检查目标文件夹是否存在,不存在则创建 if not os.path.exists(target_folder): os.makedirs(target_folder) # 遍历每个文件并复制到目标路径 for file_name in files_to_copy: src_file = os.path.join(source_folder, file_name) dst_file = os.path.join(target_folder, file_name) if os.path.exists(src_file): shutil.copy(src_file, dst_file) logger.info(f"文件 {file_name} 复制成功") else: logger.warning(f"文件 {file_name} 不存在,跳过复制") if os.path.exists(source_folder): shutil.rmtree(source_folder) # 示例调用 if __name__ == "__main__": music_ids = [834,799] output_folder = "result" max_workers = 6 # 根据 CPU 和硬盘负载合理设置线程数 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} for mid in music_ids: res = search_music_by_id(str(mid)) if res is None: continue logger.info(f"提交任务: {mid}") future = executor.submit(convert_to_simai_folder, res, output_folder) futures[future] = mid for future in as_completed(futures): mid = futures[future] try: result = future.result() logger.info(f"{mid} 处理完成") except Exception as e: logger.error(f"{mid} 处理出错: {e}")