diff --git a/infer_pack/modules/F0Predictor/DioF0Predictor.py b/infer_pack/modules/F0Predictor/DioF0Predictor.py new file mode 100644 index 0000000..e34844d --- /dev/null +++ b/infer_pack/modules/F0Predictor/DioF0Predictor.py @@ -0,0 +1,85 @@ +from infer_pack.modules.F0Predictor.F0Predictor import F0Predictor +import pyworld +import numpy as np + +class DioF0Predictor(F0Predictor): + def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + def interpolate_f0(self,f0): + ''' + 对F0进行插值处理 + ''' + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] #这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:,0], vuv_vector[:,0] + + def resize_f0(self,x, target_len): + source = np.array(x) + source[source<0.001] = np.nan + target = np.interp(np.arange(0, len(source)*target_len, len(source))/ target_len, np.arange(0, len(source)), source) + res = np.nan_to_num(target) + return res + + def compute_f0(self,wav,p_len=None): + if p_len is None: + p_len = wav.shape[0]//self.hop_length + f0, t = pyworld.dio( + wav.astype(np.double), + fs=self.sampling_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) + for index, pitch in enumerate(f0): + f0[index] = round(pitch, 1) + return self.interpolate_f0(self.resize_f0(f0, p_len))[0] + + def compute_f0_uv(self,wav,p_len=None): + if p_len is None: + p_len = wav.shape[0]//self.hop_length + f0, t = pyworld.dio( + wav.astype(np.double), + fs=self.sampling_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) + for index, pitch in enumerate(f0): + f0[index] = round(pitch, 1) + return self.interpolate_f0(self.resize_f0(f0, p_len)) diff --git a/infer_pack/modules/F0Predictor/F0Predictor.py b/infer_pack/modules/F0Predictor/F0Predictor.py new file mode 100644 index 0000000..d7ab803 --- /dev/null +++ b/infer_pack/modules/F0Predictor/F0Predictor.py @@ -0,0 +1,16 @@ +class F0Predictor(object): + def compute_f0(self,wav,p_len): + ''' + input: wav:[signal_length] + p_len:int + output: f0:[signal_length//hop_length] + ''' + pass + + def compute_f0_uv(self,wav,p_len): + ''' + input: wav:[signal_length] + p_len:int + output: f0:[signal_length//hop_length],uv:[signal_length//hop_length] + ''' + pass \ No newline at end of file diff --git a/infer_pack/modules/F0Predictor/HarvestF0Predictor.py b/infer_pack/modules/F0Predictor/HarvestF0Predictor.py new file mode 100644 index 0000000..b0933b3 --- /dev/null +++ b/infer_pack/modules/F0Predictor/HarvestF0Predictor.py @@ -0,0 +1,81 @@ +from infer_pack.modules.F0Predictor.F0Predictor import F0Predictor +import pyworld +import numpy as np + +class HarvestF0Predictor(F0Predictor): + def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + def interpolate_f0(self,f0): + ''' + 对F0进行插值处理 + ''' + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] #这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:,0], vuv_vector[:,0] + + def resize_f0(self,x, target_len): + source = np.array(x) + source[source<0.001] = np.nan + target = np.interp(np.arange(0, len(source)*target_len, len(source))/ target_len, np.arange(0, len(source)), source) + res = np.nan_to_num(target) + return res + + def compute_f0(self,wav,p_len=None): + if p_len is None: + p_len = wav.shape[0]//self.hop_length + f0, t = pyworld.harvest( + wav.astype(np.double), + fs=self.hop_length, + f0_ceil=self.f0_max, + f0_floor=self.f0_min, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.fs) + return self.interpolate_f0(self.resize_f0(f0, p_len))[0] + + def compute_f0_uv(self,wav,p_len=None): + if p_len is None: + p_len = wav.shape[0]//self.hop_length + f0, t = pyworld.harvest( + wav.astype(np.double), + fs=self.sampling_rate, + f0_floor=self.f0_min, + f0_ceil=self.f0_max, + frame_period=1000 * self.hop_length / self.sampling_rate, + ) + f0 = pyworld.stonemask(wav.astype(np.double), f0, t, self.sampling_rate) + return self.interpolate_f0(self.resize_f0(f0, p_len)) diff --git a/infer_pack/modules/F0Predictor/PMF0Predictor.py b/infer_pack/modules/F0Predictor/PMF0Predictor.py new file mode 100644 index 0000000..e9513a8 --- /dev/null +++ b/infer_pack/modules/F0Predictor/PMF0Predictor.py @@ -0,0 +1,83 @@ +from infer_pack.modules.F0Predictor.F0Predictor import F0Predictor +import parselmouth +import numpy as np + +class PMF0Predictor(F0Predictor): + def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100): + self.hop_length = hop_length + self.f0_min = f0_min + self.f0_max = f0_max + self.sampling_rate = sampling_rate + + + def interpolate_f0(self,f0): + ''' + 对F0进行插值处理 + ''' + + data = np.reshape(f0, (f0.size, 1)) + + vuv_vector = np.zeros((data.size, 1), dtype=np.float32) + vuv_vector[data > 0.0] = 1.0 + vuv_vector[data <= 0.0] = 0.0 + + ip_data = data + + frame_number = data.size + last_value = 0.0 + for i in range(frame_number): + if data[i] <= 0.0: + j = i + 1 + for j in range(i + 1, frame_number): + if data[j] > 0.0: + break + if j < frame_number - 1: + if last_value > 0.0: + step = (data[j] - data[i - 1]) / float(j - i) + for k in range(i, j): + ip_data[k] = data[i - 1] + step * (k - i + 1) + else: + for k in range(i, j): + ip_data[k] = data[j] + else: + for k in range(i, frame_number): + ip_data[k] = last_value + else: + ip_data[i] = data[i] #这里可能存在一个没有必要的拷贝 + last_value = data[i] + + return ip_data[:,0], vuv_vector[:,0] + + def compute_f0(self,wav,p_len=None): + x = wav + if p_len is None: + p_len = x.shape[0]//self.hop_length + else: + assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error" + time_step = self.hop_length / self.sampling_rate * 1000 + f0 = parselmouth.Sound(x, self.sampling_rate).to_pitch_ac( + time_step=time_step / 1000, voicing_threshold=0.6, + pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array['frequency'] + + pad_size=(p_len - len(f0) + 1) // 2 + if(pad_size>0 or p_len - len(f0) - pad_size>0): + f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant') + f0,uv = self.interpolate_f0(f0) + return f0 + + def compute_f0_uv(self,wav,p_len=None): + x = wav + if p_len is None: + p_len = x.shape[0]//self.hop_length + else: + assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error" + time_step = self.hop_length / self.sampling_rate * 1000 + f0 = parselmouth.Sound(x, self.sampling_rate).to_pitch_ac( + time_step=time_step / 1000, voicing_threshold=0.6, + pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array['frequency'] + + pad_size=(p_len - len(f0) + 1) // 2 + if(pad_size>0 or p_len - len(f0) - pad_size>0): + f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant') + f0,uv = self.interpolate_f0(f0) + return f0,uv diff --git a/infer_pack/modules/F0Predictor/__init__.py b/infer_pack/modules/F0Predictor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/infer_pack/onnx_inference.py b/infer_pack/onnx_inference.py new file mode 100644 index 0000000..c0bd397 --- /dev/null +++ b/infer_pack/onnx_inference.py @@ -0,0 +1,121 @@ +import onnxruntime +import librosa +import numpy as np +import soundfile + +class ContentVec(): + def __init__(self, vec_path = "pretrained/vec-768-layer-12.onnx",device=None): + print("load model(s) from {}".format(vec_path)) + if device == 'cpu' or device is None: + providers = ['CPUExecutionProvider'] + elif device == 'cuda': + providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] + else: + raise RuntimeError("Unsportted Device") + self.model = onnxruntime.InferenceSession(vec_path, providers=providers) + + def __call__(self, wav): + return self.forward(wav) + + def forward(self, wav): + feats = wav + if feats.ndim == 2: # double channels + feats = feats.mean(-1) + assert feats.ndim == 1, feats.ndim + feats = np.expand_dims(np.expand_dims(feats, 0), 0) + onnx_input = {self.model.get_inputs()[0].name: feats} + logits = self.model.run(None, onnx_input)[0] + return logits.transpose(0, 2, 1) + + +def get_f0_predictor(f0_predictor, hop_length, sampling_rate, **kargs): + if f0_predictor == "pm": + from infer_pack.modules.F0Predictor.PMF0Predictor import PMF0Predictor + f0_predictor_object = PMF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate) + elif f0_predictor == "harvest": + from infer_pack.modules.F0Predictor.HarvestF0Predictor import HarvestF0Predictor + f0_predictor_object = HarvestF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate) + elif f0_predictor == "dio": + from infer_pack.modules.F0Predictor.DioF0Predictor import DioF0Predictor + f0_predictor_object = DioF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate) + else: + raise Exception("Unknown f0 predictor") + return f0_predictor_object + + +class OnnxRVC(): + def __init__( + self, + model_path, + sr=40000, + hop_size=512, + vec_path="vec-768-layer-12", + device="cpu" + ): + vec_path = f"pretrained/{vec_path}.onnx" + self.vec_model = ContentVec(vec_path, device) + if device == 'cpu' or device is None: + providers = ['CPUExecutionProvider'] + elif device == 'cuda': + providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] + else: + raise RuntimeError("Unsportted Device") + self.model = onnxruntime.InferenceSession(model_path, providers=providers) + self.sampling_rate = sr + self.hop_size = hop_size + + def forward(self, hubert, hubert_length, pitch, pitchf, ds, rnd): + onnx_input = { + self.model.get_inputs()[0].name: hubert, + self.model.get_inputs()[1].name: hubert_length, + self.model.get_inputs()[2].name: pitch, + self.model.get_inputs()[3].name: pitchf, + self.model.get_inputs()[4].name: ds, + self.model.get_inputs()[5].name: rnd + } + return (self.model.run(None, onnx_input)[0] * 32767).astype(np.int16) + + def inference(self, raw_path, sid, f0_method="dio", f0_up_key=0, pad_time=0.5, cr_threshold=0.02): + f0_min = 50 + f0_max = 1100 + f0_mel_min = 1127 * np.log(1 + f0_min / 700) + f0_mel_max = 1127 * np.log(1 + f0_max / 700) + f0_predictor = get_f0_predictor( + f0_method, + hop_length=self.hop_size, + sampling_rate=self.sampling_rate, + threshold=cr_threshold + ) + wav, sr = librosa.load(raw_path, sr=self.sampling_rate) + org_length = len(wav) + if org_length / sr > 50.: + raise RuntimeError("Reached Max Length") + + wav16k = librosa.resample(wav, orig_sr=self.sampling_rate, target_sr=16000) + wav16k = wav16k + + hubert = self.vec_model(wav16k) + hubert = np.repeat(hubert, 2, axis=2).transpose(0, 2, 1).astype(np.float32) + hubert_length = hubert.shape[1] + + pitchf = f0_predictor.compute_f0(wav, hubert_length) + pitchf = pitchf * 2 ** (f0_up_key / 12) + pitch = pitchf.copy() + f0_mel = 1127 * np.log(1 + pitch / 700) + f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( + f0_mel_max - f0_mel_min + ) + 1 + f0_mel[f0_mel <= 1] = 1 + f0_mel[f0_mel > 255] = 255 + pitch = np.rint(f0_mel).astype(np.int64) + + pitchf = pitchf.reshape(1, len(pitchf)).astype(np.float32) + pitch = pitch.reshape(1, len(pitch)) + ds = np.array([sid]).astype(np.int64) + + rnd = np.random.randn(1, 192, hubert_length).astype(np.float32) + hubert_length = np.array([hubert_length]).astype(np.int64) + + out_wav = self.forward(hubert, hubert_length, pitch, pitchf, ds, rnd).squeeze() + out_wav = np.pad(out_wav, (0, 2*self.hop_size), 'constant') + return out_wav[0:org_length] \ No newline at end of file diff --git a/onnx_inference_demo.py b/onnx_inference_demo.py new file mode 100644 index 0000000..a3093d8 --- /dev/null +++ b/onnx_inference_demo.py @@ -0,0 +1,18 @@ +import soundfile +from infer_pack.onnx_inference import OnnxRVC + +hop_size = 512 +sampling_rate = 40000 #采样率 +f0_up_key = 0 #升降调 +sid = 0 #角色ID +f0_method = "dio" #F0提取算法 +model_path = "ShirohaRVC.onnx" #模型的完整路径 +vec_name = "vec-256-layer-9" #内部自动补齐为 f"pretrained/{vec_name}.onnx" 需要onnx的vec模型 +wav_path = "123.wav" #输入路径或ByteIO实例 +out_path = "out.wav" #输出路径或ByteIO实例 + +model = OnnxRVC(model_path, vec_path=vec_name, sr=sampling_rate, hop_size=hop_size, device="cuda") + +audio = model.inference(wav_path, sid, f0_method=f0_method, f0_up_key=f0_up_key) + +soundfile.write(out_path, audio, sampling_rate) \ No newline at end of file