import os import sys from dotenv import load_dotenv now_dir = os.getcwd() sys.path.append(now_dir) load_dotenv() from infer.modules.vc.modules import VC from i18n.i18n import I18nAuto from configs.config import Config from sklearn.cluster import MiniBatchKMeans import torch, platform import numpy as np import gradio as gr import faiss import fairseq import pathlib import json from time import sleep from subprocess import Popen from random import shuffle import warnings import traceback import threading import shutil import logging logging.getLogger("numba").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) tmp = os.path.join(now_dir, "TEMP") shutil.rmtree(tmp, ignore_errors=True) shutil.rmtree("%s/runtime/Lib/site-packages/infer_pack" % (now_dir), ignore_errors=True) shutil.rmtree("%s/runtime/Lib/site-packages/uvr5_pack" % (now_dir), ignore_errors=True) os.makedirs(tmp, exist_ok=True) os.makedirs(os.path.join(now_dir, "logs"), exist_ok=True) os.makedirs(os.path.join(now_dir, "assets/weights"), exist_ok=True) os.environ["TEMP"] = tmp warnings.filterwarnings("ignore") torch.manual_seed(114514) config = Config() vc = VC(config) if config.dml == True: def forward_dml(ctx, x, scale): ctx.scale = scale res = x.clone().detach() return res fairseq.modules.grad_multiply.GradMultiply.forward = forward_dml i18n = I18nAuto() logger.info(i18n) # 判断是否有能用来训练和加速推理的N卡 ngpu = torch.cuda.device_count() gpu_infos = [] mem = [] if_gpu_ok = False if torch.cuda.is_available() or ngpu != 0: for i in range(ngpu): gpu_name = torch.cuda.get_device_name(i) if any( value in gpu_name.upper() for value in [ "10", "16", "20", "30", "40", "A2", "A3", "A4", "P4", "A50", "500", "A60", "70", "80", "90", "M4", "T4", "TITAN", "4060", "L", "6000", ] ): # A10#A100#V100#A40#P40#M40#K80#A4500 if_gpu_ok = True # 至少有一张能用的N卡 gpu_infos.append("%s\t%s" % (i, gpu_name)) mem.append( int( torch.cuda.get_device_properties(i).total_memory / 1024 / 1024 / 1024 + 0.4 ) ) if if_gpu_ok and len(gpu_infos) > 0: gpu_info = "\n".join(gpu_infos) default_batch_size = min(mem) // 2 else: gpu_info = i18n("很遗憾您这没有能用的显卡来支持您训练") default_batch_size = 1 gpus = "-".join([i[0] for i in gpu_infos]) class ToolButton(gr.Button, gr.components.FormComponent): """Small button with single emoji as text, fits inside gradio forms""" def __init__(self, **kwargs): super().__init__(variant="tool", **kwargs) def get_block_name(self): return "button" weight_root = os.getenv("weight_root") weight_uvr5_root = os.getenv("weight_uvr5_root") index_root = os.getenv("index_root") outside_index_root = os.getenv("outside_index_root") names = [] for name in os.listdir(weight_root): if name.endswith(".pth"): names.append(name) index_paths = [] def lookup_indices(index_root): global index_paths for root, dirs, files in os.walk(index_root, topdown=False): for name in files: if name.endswith(".index") and "trained" not in name: index_paths.append("%s/%s" % (root, name)) lookup_indices(index_root) lookup_indices(outside_index_root) uvr5_names = [] for name in os.listdir(weight_uvr5_root): if name.endswith(".pth") or "onnx" in name: uvr5_names.append(name.replace(".pth", "")) def change_choices(): names = [] for name in os.listdir(weight_root): if name.endswith(".pth"): names.append(name) index_paths = [] for root, dirs, files in os.walk(index_root, topdown=False): for name in files: if name.endswith(".index") and "trained" not in name: index_paths.append("%s/%s" % (root, name)) return {"choices": sorted(names), "__type__": "update"}, { "choices": sorted(index_paths), "__type__": "update", } def clean(): return {"value": "", "__type__": "update"} def export_onnx(ModelPath, ExportedPath): from infer.modules.onnx.export import export_onnx as eo eo(ModelPath, ExportedPath) sr_dict = { "32k": 32000, "40k": 40000, "48k": 48000, } def if_done(done, p): while 1: if p.poll() is None: sleep(0.5) else: break done[0] = True def if_done_multi(done, ps): while 1: # poll==None代表进程未结束 # 只要有一个进程未结束都不停 flag = 1 for p in ps: if p.poll() is None: flag = 0 sleep(0.5) break if flag == 1: break done[0] = True def preprocess_dataset(trainset_dir, exp_dir, sr, n_p): sr = sr_dict[sr] os.makedirs("%s/logs/%s" % (now_dir, exp_dir), exist_ok=True) f = open("%s/logs/%s/preprocess.log" % (now_dir, exp_dir), "w") f.close() cmd = '"%s" infer/modules/train/preprocess.py "%s" %s %s "%s/logs/%s" %s %.1f' % ( config.python_cmd, trainset_dir, sr, n_p, now_dir, exp_dir, config.noparallel, config.preprocess_per, ) logger.info("Execute: " + cmd) # , stdin=PIPE, stdout=PIPE,stderr=PIPE,cwd=now_dir p = Popen(cmd, shell=True) # 煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 done = [False] threading.Thread( target=if_done, args=( done, p, ), ).start() while 1: with open("%s/logs/%s/preprocess.log" % (now_dir, exp_dir), "r") as f: yield (f.read()) sleep(1) if done[0]: break with open("%s/logs/%s/preprocess.log" % (now_dir, exp_dir), "r") as f: log = f.read() logger.info(log) yield log # but2.click(extract_f0,[gpus6,np7,f0method8,if_f0_3,trainset_dir4],[info2]) def extract_f0_feature(gpus, n_p, f0method, if_f0, exp_dir, version19, gpus_rmvpe): gpus = gpus.split("-") os.makedirs("%s/logs/%s" % (now_dir, exp_dir), exist_ok=True) f = open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "w") f.close() if if_f0: if f0method != "rmvpe_gpu": cmd = ( '"%s" infer/modules/train/extract/extract_f0_print.py "%s/logs/%s" %s %s' % ( config.python_cmd, now_dir, exp_dir, n_p, f0method, ) ) logger.info("Execute: " + cmd) p = Popen( cmd, shell=True, cwd=now_dir ) # , stdin=PIPE, stdout=PIPE,stderr=PIPE # 煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 done = [False] threading.Thread( target=if_done, args=( done, p, ), ).start() else: if gpus_rmvpe != "-": gpus_rmvpe = gpus_rmvpe.split("-") leng = len(gpus_rmvpe) ps = [] for idx, n_g in enumerate(gpus_rmvpe): cmd = ( '"%s" infer/modules/train/extract/extract_f0_rmvpe.py %s %s %s "%s/logs/%s" %s ' % ( config.python_cmd, leng, idx, n_g, now_dir, exp_dir, config.is_half, ) ) logger.info("Execute: " + cmd) p = Popen( cmd, shell=True, cwd=now_dir ) # , shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=now_dir ps.append(p) # 煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 done = [False] threading.Thread( target=if_done_multi, # args=( done, ps, ), ).start() else: cmd = ( config.python_cmd + ' infer/modules/train/extract/extract_f0_rmvpe_dml.py "%s/logs/%s" ' % ( now_dir, exp_dir, ) ) logger.info("Execute: " + cmd) p = Popen( cmd, shell=True, cwd=now_dir ) # , shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=now_dir p.wait() done = [True] while 1: with open( "%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r" ) as f: yield (f.read()) sleep(1) if done[0]: break with open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r") as f: log = f.read() logger.info(log) yield log # 对不同part分别开多进程 """ n_part=int(sys.argv[1]) i_part=int(sys.argv[2]) i_gpu=sys.argv[3] exp_dir=sys.argv[4] os.environ["CUDA_VISIBLE_DEVICES"]=str(i_gpu) """ leng = len(gpus) ps = [] for idx, n_g in enumerate(gpus): cmd = ( '"%s" infer/modules/train/extract_feature_print.py %s %s %s %s "%s/logs/%s" %s %s' % ( config.python_cmd, config.device, leng, idx, n_g, now_dir, exp_dir, version19, config.is_half, ) ) logger.info("Execute: " + cmd) p = Popen( cmd, shell=True, cwd=now_dir ) # , shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=now_dir ps.append(p) # 煞笔gr, popen read都非得全跑完了再一次性读取, 不用gr就正常读一句输出一句;只能额外弄出一个文本流定时读 done = [False] threading.Thread( target=if_done_multi, args=( done, ps, ), ).start() while 1: with open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r") as f: yield (f.read()) sleep(1) if done[0]: break with open("%s/logs/%s/extract_f0_feature.log" % (now_dir, exp_dir), "r") as f: log = f.read() logger.info(log) yield log def get_pretrained_models(path_str, f0_str, sr2): if_pretrained_generator_exist = os.access( "assets/pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2), os.F_OK ) if_pretrained_discriminator_exist = os.access( "assets/pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2), os.F_OK ) if not if_pretrained_generator_exist: logger.warning( "assets/pretrained%s/%sG%s.pth not exist, will not use pretrained model", path_str, f0_str, sr2, ) if not if_pretrained_discriminator_exist: logger.warning( "assets/pretrained%s/%sD%s.pth not exist, will not use pretrained model", path_str, f0_str, sr2, ) return ( ( "assets/pretrained%s/%sG%s.pth" % (path_str, f0_str, sr2) if if_pretrained_generator_exist else "" ), ( "assets/pretrained%s/%sD%s.pth" % (path_str, f0_str, sr2) if if_pretrained_discriminator_exist else "" ), ) def change_sr2(sr2, if_f0_3, version19): path_str = "" if version19 == "v1" else "_v2" f0_str = "f0" if if_f0_3 else "" return get_pretrained_models(path_str, f0_str, sr2) def change_version19(sr2, if_f0_3, version19): path_str = "" if version19 == "v1" else "_v2" if sr2 == "32k" and version19 == "v1": sr2 = "40k" to_return_sr2 = ( {"choices": ["40k", "48k"], "__type__": "update", "value": sr2} if version19 == "v1" else {"choices": ["40k", "48k", "32k"], "__type__": "update", "value": sr2} ) f0_str = "f0" if if_f0_3 else "" return ( *get_pretrained_models(path_str, f0_str, sr2), to_return_sr2, ) # f0method8,pretrained_G14,pretrained_D15 def change_f0(if_f0_3, sr2, version19): path_str = "" if version19 == "v1" else "_v2" return ( {"visible": if_f0_3, "__type__": "update"}, {"visible": if_f0_3, "__type__": "update"}, *get_pretrained_models(path_str, "f0" if if_f0_3 == True else "", sr2), ) # ckpt_path2.change(change_info_,[ckpt_path2],[sr__,if_f0__]) def change_info_(ckpt_path): if not os.path.exists(ckpt_path.replace(os.path.basename(ckpt_path), "train.log")): return {"__type__": "update"}, {"__type__": "update"}, {"__type__": "update"} try: with open( ckpt_path.replace(os.path.basename(ckpt_path), "train.log"), "r" ) as f: info = eval(f.read().strip("\n").split("\n")[0].split("\t")[-1]) sr, f0 = info["sample_rate"], info["if_f0"] version = "v2" if ("version" in info and info["version"] == "v2") else "v1" return sr, str(f0), version except: traceback.print_exc() return {"__type__": "update"}, {"__type__": "update"}, {"__type__": "update"} F0GPUVisible = config.dml == False def change_f0_method(f0method8): if f0method8 == "rmvpe_gpu": visible = F0GPUVisible else: visible = False return {"visible": visible, "__type__": "update"} with gr.Blocks(title="RVC WebUI") as app: gr.Markdown("## RVC WebUI") gr.Markdown( value=i18n( "本软件以MIT协议开源, 作者不对软件具备任何控制力, 使用软件者、传播软件导出的声音者自负全责.
如不认可该条款, 则不能使用或引用软件包内任何代码和文件. 详见根目录LICENSE." ) ) with gr.Tabs(): with gr.TabItem(i18n("模型推理")): with gr.Row(): sid0 = gr.Dropdown(label=i18n("推理音色"), choices=sorted(names)) with gr.Column(): refresh_button = gr.Button( i18n("刷新音色列表和索引路径"), variant="primary" ) clean_button = gr.Button(i18n("卸载音色省显存"), variant="primary") spk_item = gr.Slider( minimum=0, maximum=2333, step=1, label=i18n("请选择说话人id"), value=0, visible=False, interactive=True, ) clean_button.click( fn=clean, inputs=[], outputs=[sid0], api_name="infer_clean" ) with gr.TabItem(i18n("单次推理")): with gr.Group(): with gr.Row(): with gr.Column(): vc_transform0 = gr.Number( label=i18n("变调(整数, 半音数量, 升八度12降八度-12)"), value=0, ) input_audio0 = gr.File( label=i18n("输入待处理音频文件路径(默认是正确格式示例)"), file_count="single", file_types=[".index"], interactive=True ) file_index1 = gr.Textbox( label=i18n( "特征检索库文件路径,为空则使用下拉的选择结果" ), placeholder="C:\\Users\\Desktop\\model_example.index", interactive=True, ) file_index2 = gr.Dropdown( label=i18n("自动检测index路径,下拉式选择(dropdown)"), choices=sorted(index_paths), interactive=True, ) f0method0 = gr.Radio( label=i18n( "选择音高提取算法,输入歌声可用pm提速,harvest低音好但巨慢无比,crepe效果好但吃GPU,rmvpe效果最好且微吃GPU" ), choices=( ["pm", "harvest", "crepe", "rmvpe"] if config.dml == False else ["pm", "harvest", "rmvpe"] ), value="rmvpe", interactive=True, ) with gr.Column(): resample_sr0 = gr.Slider( minimum=0, maximum=48000, label=i18n("后处理重采样至最终采样率,0为不进行重采样"), value=0, step=1, interactive=True, ) rms_mix_rate0 = gr.Slider( minimum=0, maximum=1, label=i18n( "输入源音量包络替换输出音量包络融合比例,越靠近1越使用输出包络" ), value=0.25, interactive=True, ) protect0 = gr.Slider( minimum=0, maximum=0.5, label=i18n( "保护清辅音和呼吸声,防止电音撕裂等artifact,拉满0.5不开启,调低加大保护力度但可能降低索引效果" ), value=0.33, step=0.01, interactive=True, ) filter_radius0 = gr.Slider( minimum=0, maximum=7, label=i18n( ">=3则使用对harvest音高识别的结果使用中值滤波,数值为滤波半径,使用可以削弱哑音" ), value=3, step=1, interactive=True, ) index_rate1 = gr.Slider( minimum=0, maximum=1, label=i18n("检索特征占比"), value=0.75, interactive=True, ) f0_file = gr.File( label=i18n( "F0曲线文件, 可选, 一行一个音高, 代替默认F0及升降调" ), visible=False, ) refresh_button.click( fn=change_choices, inputs=[], outputs=[sid0, file_index2], api_name="infer_refresh", ) with gr.Group(): with gr.Column(): but0 = gr.Button(i18n("转换"), variant="primary") with gr.Row(): vc_output1 = gr.Textbox(label=i18n("输出信息")) vc_output2 = gr.Audio( label=i18n("输出音频(右下角三个点,点了可以下载)") ) but0.click( vc.vc_single, [ spk_item, input_audio0, vc_transform0, f0_file, f0method0, file_index1, file_index2, index_rate1, filter_radius0, resample_sr0, rms_mix_rate0, protect0, ], [vc_output1, vc_output2], api_name="infer_convert", ) with gr.TabItem(i18n("批量推理")): gr.Markdown( value=i18n( "批量转换, 输入待转换音频文件夹, 或上传多个音频文件, 在指定文件夹(默认opt)下输出转换的音频. " ) ) with gr.Row(): with gr.Column(): vc_transform1 = gr.Number( label=i18n("变调(整数, 半音数量, 升八度12降八度-12)"), value=0, ) opt_input = gr.Textbox( label=i18n("指定输出文件夹"), value="opt" ) file_index3 = gr.Textbox( label=i18n("特征检索库文件路径,为空则使用下拉的选择结果"), value="", interactive=True, ) file_index4 = gr.Dropdown( label=i18n("自动检测index路径,下拉式选择(dropdown)"), choices=sorted(index_paths), interactive=True, ) f0method1 = gr.Radio( label=i18n( "选择音高提取算法,输入歌声可用pm提速,harvest低音好但巨慢无比,crepe效果好但吃GPU,rmvpe效果最好且微吃GPU" ), choices=( ["pm", "harvest", "crepe", "rmvpe"] if config.dml == False else ["pm", "harvest", "rmvpe"] ), value="rmvpe", interactive=True, ) format1 = gr.Radio( label=i18n("导出文件格式"), choices=["wav", "flac", "mp3", "m4a"], value="wav", interactive=True, ) refresh_button.click( fn=lambda: change_choices()[1], inputs=[], outputs=file_index4, api_name="infer_refresh_batch", ) with gr.Column(): resample_sr1 = gr.Slider( minimum=0, maximum=48000, label=i18n("后处理重采样至最终采样率,0为不进行重采样"), value=0, step=1, interactive=True, ) rms_mix_rate1 = gr.Slider( minimum=0, maximum=1, label=i18n( "输入源音量包络替换输出音量包络融合比例,越靠近1越使用输出包络" ), value=1, interactive=True, ) protect1 = gr.Slider( minimum=0, maximum=0.5, label=i18n( "保护清辅音和呼吸声,防止电音撕裂等artifact,拉满0.5不开启,调低加大保护力度但可能降低索引效果" ), value=0.33, step=0.01, interactive=True, ) filter_radius1 = gr.Slider( minimum=0, maximum=7, label=i18n( ">=3则使用对harvest音高识别的结果使用中值滤波,数值为滤波半径,使用可以削弱哑音" ), value=3, step=1, interactive=True, ) index_rate2 = gr.Slider( minimum=0, maximum=1, label=i18n("检索特征占比"), value=1, interactive=True, ) with gr.Row(): dir_input = gr.Textbox( label=i18n( "输入待处理音频文件夹路径(去文件管理器地址栏拷就行了)" ), placeholder="C:\\Users\\Desktop\\input_vocal_dir", ) inputs = gr.File( file_count="multiple", label=i18n("也可批量输入音频文件, 二选一, 优先读文件夹"), ) with gr.Row(): but1 = gr.Button(i18n("转换"), variant="primary") vc_output3 = gr.Textbox(label=i18n("输出信息")) but1.click( vc.vc_multi, [ spk_item, dir_input, opt_input, inputs, vc_transform1, f0method1, file_index3, file_index4, # file_big_npy2, index_rate2, filter_radius1, resample_sr1, rms_mix_rate1, protect1, format1, ], [vc_output3], api_name="infer_convert_batch", ) sid0.change( fn=vc.get_vc, inputs=[sid0, protect0, protect1], outputs=[spk_item, protect0, protect1, file_index2, file_index4], api_name="infer_change_voice", ) if config.iscolab: app.queue(concurrency_count=511, max_size=1022).launch(share=True) else: app.queue(concurrency_count=511, max_size=1022).launch( server_name="localhost", inbrowser=not config.noautoopen, server_port=config.listen_port, quiet=True, )