Source code for nnsvs.acoustic_models.multistream

import torch
from nnsvs.acoustic_models.util import pad_inference
from nnsvs.base import BaseModel, PredictionType
from nnsvs.multistream import split_streams
from torch import nn

__all__ = [
    "MultistreamSeparateF0ParametricModel",
    "NPSSMultistreamParametricModel",
    "NPSSMDNMultistreamParametricModel",
    "MultistreamSeparateF0MelModel",
    "MDNMultistreamSeparateF0MelModel",
]


[docs] class MultistreamSeparateF0ParametricModel(BaseModel): """Multi-stream model with a separate F0 prediction model acoustic features: [MGC, LF0, VUV, BAP] vib_model and vib_flags_model are optional and will be likely to be removed. Conditional dependency: p(MGC, LF0, VUV, BAP |C) = p(LF0|C) p(MGC|LF0, C) p(BAP|LF0, C) p(VUV|LF0, C) Args: in_dim (int): Input dimension. out_dim (int): Output dimension. stream_sizes (list): List of stream sizes. reduction_factor (int): Reduction factor. encoder (nn.Module): A shared encoder. mgc_model (nn.Module): MGC prediction model. lf0_model (nn.Module): log-F0 prediction model. vuv_model (nn.Module): V/UV prediction model. bap_model (nn.Module): BAP prediction model. in_rest_idx (int): Index of the rest symbol in the input features. in_lf0_idx (int): index of lf0 in input features in_lf0_min (float): minimum value of lf0 in the training data of input features in_lf0_max (float): maximum value of lf0 in the training data of input features out_lf0_idx (int): index of lf0 in output features. Typically 180. out_lf0_mean (float): mean of lf0 in the training data of output features out_lf0_scale (float): scale of lf0 in the training data of output features lf0_teacher_forcing (bool): Whether to use teacher forcing for F0 prediction. """ def __init__( self, in_dim: int, out_dim: int, stream_sizes: list, reduction_factor: int, encoder: nn.Module, mgc_model: nn.Module, lf0_model: nn.Module, vuv_model: nn.Module, bap_model: nn.Module, vib_model: nn.Module = None, # kept as is for compatibility vib_flags_model: nn.Module = None, # kept as is for compatibility # NOTE: you must carefully set the following parameters in_rest_idx=1, in_lf0_idx=300, in_lf0_min=5.3936276, in_lf0_max=6.491111, out_lf0_idx=180, out_lf0_mean=5.953093881972361, out_lf0_scale=0.23435173188961034, lf0_teacher_forcing=True, ): super().__init__() self.in_dim = in_dim self.out_dim = out_dim self.stream_sizes = stream_sizes self.reduction_factor = reduction_factor self.lf0_teacher_forcing = lf0_teacher_forcing assert len(stream_sizes) in [4] self.encoder = encoder if self.encoder is not None: assert not encoder.is_autoregressive() self.mgc_model = mgc_model self.lf0_model = lf0_model self.vuv_model = vuv_model self.bap_model = bap_model self.in_rest_idx = in_rest_idx self.in_lf0_idx = in_lf0_idx self.in_lf0_min = in_lf0_min self.in_lf0_max = in_lf0_max self.out_lf0_idx = out_lf0_idx self.out_lf0_mean = out_lf0_mean self.out_lf0_scale = out_lf0_scale def has_residual_lf0_prediction(self): return True def _set_lf0_params(self): # Special care for residual F0 prediction models # NOTE: don't overwrite out_lf0_idx and in_lf0_idx if hasattr(self.lf0_model, "out_lf0_mean"): self.lf0_model.in_lf0_min = self.in_lf0_min self.lf0_model.in_lf0_max = self.in_lf0_max self.lf0_model.out_lf0_mean = self.out_lf0_mean self.lf0_model.out_lf0_scale = self.out_lf0_scale def is_autoregressive(self): return ( self.mgc_model.is_autoregressive() or self.lf0_model.is_autoregressive() or self.vuv_model.is_autoregressive() or self.bap_model.is_autoregressive() ) def forward(self, x, lengths=None, y=None): self._set_lf0_params() assert x.shape[-1] == self.in_dim if y is not None: # Teacher-forcing y_mgc, y_lf0, y_vuv, y_bap = split_streams(y, self.stream_sizes) else: # Inference y_mgc, y_lf0, y_vuv, y_bap = None, None, None, None # Predict continuous log-F0 first lf0, lf0_residual = self.lf0_model(x, lengths, y_lf0) if self.encoder is not None: encoder_outs = self.encoder(x, lengths) # Concat log-F0, rest flags and the outputs of the encoder # This may make the decoder to be aware of the input F0 rest_flags = x[:, :, self.in_rest_idx].unsqueeze(-1) if self.lf0_teacher_forcing and y is not None: encoder_outs = torch.cat([encoder_outs, rest_flags, y_lf0], dim=-1) else: encoder_outs = torch.cat([encoder_outs, rest_flags, lf0], dim=-1) else: encoder_outs = x # Decoders for each stream mgc = self.mgc_model(encoder_outs, lengths, y_mgc) vuv = self.vuv_model(encoder_outs, lengths, y_vuv) bap = self.bap_model(encoder_outs, lengths, y_bap) # make a concatenated stream has_postnet_output = ( isinstance(mgc, list) or isinstance(lf0, list) or isinstance(vuv, list) or isinstance(bap, list) ) if has_postnet_output: outs = [] for idx in range(len(mgc)): mgc_ = mgc[idx] if isinstance(mgc, list) else mgc lf0_ = lf0[idx] if isinstance(lf0, list) else lf0 vuv_ = vuv[idx] if isinstance(vuv, list) else vuv bap_ = bap[idx] if isinstance(bap, list) else bap out = torch.cat([mgc_, lf0_, vuv_, bap_], dim=-1) assert out.shape[-1] == self.out_dim outs.append(out) return outs, lf0_residual else: out = torch.cat([mgc, lf0, vuv, bap], dim=-1) assert out.shape[-1] == self.out_dim return out, lf0_residual def inference(self, x, lengths=None): return pad_inference( model=self, x=x, lengths=lengths, reduction_factor=self.reduction_factor )
[docs] class NPSSMultistreamParametricModel(BaseModel): """NPSS-like cascaded multi-stream model with no mixture density networks. NPSS: :cite:t:`blaauw2017neural` Different from the original NPSS, we don't use spectral parameters for the inputs of aperiodicity and V/UV prediction models. This is because (1) D4C does not use spectral parameters as input for aperiodicity estimation. (2) V/UV detection is done from aperiodicity at 0-3 kHz in WORLD. In addition, f0 and VUV models dont use MDNs. Empirically, we found the above configuration works better than the original one. Conditional dependency: p(MGC, LF0, VUV, BAP |C) = p(LF0|C) p(MGC|LF0, C) p(BAP|LF0, C) p(VUV|LF0, BAP, C) Args: in_dim (int): Input dimension. out_dim (int): Output dimension. stream_sizes (list): List of stream sizes. lf0_model (BaseModel): Model for predicting log-F0. mgc_model (BaseModel): Model for predicting MGC. bap_model (BaseModel): Model for predicting BAP. vuv_model (BaseModel): Model for predicting V/UV. in_rest_idx (int): Index of the rest symbol in the input features. in_lf0_idx (int): index of lf0 in input features in_lf0_min (float): minimum value of lf0 in the training data of input features in_lf0_max (float): maximum value of lf0 in the training data of input features out_lf0_idx (int): index of lf0 in output features. Typically 180. out_lf0_mean (float): mean of lf0 in the training data of output features out_lf0_scale (float): scale of lf0 in the training data of output features vuv_model_bap_conditioning (bool): If True, use BAP features for V/UV prediction. vuv_model_bap0_conditioning (bool): If True, use only 0-th coef. of BAP for V/UV prediction. vuv_model_lf0_conditioning (bool): If True, use log-F0 features for V/UV prediction. vuv_model_mgc_conditioning (bool): If True, use MGC features for V/UV prediction. """ def __init__( self, in_dim: int, out_dim: int, stream_sizes: list, reduction_factor: int, lf0_model: nn.Module, mgc_model: nn.Module, bap_model: nn.Module, vuv_model: nn.Module, # NOTE: you must carefully set the following parameters in_rest_idx=0, in_lf0_idx=51, in_lf0_min=5.3936276, in_lf0_max=6.491111, out_lf0_idx=60, out_lf0_mean=5.953093881972361, out_lf0_scale=0.23435173188961034, npss_style_conditioning=False, vuv_model_bap_conditioning=True, vuv_model_bap0_conditioning=False, vuv_model_lf0_conditioning=True, vuv_model_mgc_conditioning=False, ): super().__init__() self.in_dim = in_dim self.out_dim = out_dim self.stream_sizes = stream_sizes self.reduction_factor = reduction_factor self.vuv_model_bap_conditioning = vuv_model_bap_conditioning self.vuv_model_bap0_conditioning = vuv_model_bap0_conditioning self.vuv_model_lf0_conditioning = vuv_model_lf0_conditioning self.vuv_model_mgc_conditioning = vuv_model_mgc_conditioning assert not npss_style_conditioning, "Not supported" assert len(stream_sizes) in [4] self.lf0_model = lf0_model self.mgc_model = mgc_model self.bap_model = bap_model self.vuv_model = vuv_model self.in_rest_idx = in_rest_idx self.in_lf0_idx = in_lf0_idx self.in_lf0_min = in_lf0_min self.in_lf0_max = in_lf0_max self.out_lf0_idx = out_lf0_idx self.out_lf0_mean = out_lf0_mean self.out_lf0_scale = out_lf0_scale def _set_lf0_params(self): # Special care for residual F0 prediction models # NOTE: don't overwrite out_lf0_idx and in_lf0_idx if hasattr(self.lf0_model, "out_lf0_mean"): self.lf0_model.in_lf0_min = self.in_lf0_min self.lf0_model.in_lf0_max = self.in_lf0_max self.lf0_model.out_lf0_mean = self.out_lf0_mean self.lf0_model.out_lf0_scale = self.out_lf0_scale def prediction_type(self): return PredictionType.DETERMINISTIC def is_autoregressive(self): return ( self.mgc_model.is_autoregressive() or self.lf0_model.is_autoregressive() or self.vuv_model.is_autoregressive() or self.bap_model.is_autoregressive() ) def has_residual_lf0_prediction(self): return True def forward(self, x, lengths=None, y=None): self._set_lf0_params() assert x.shape[-1] == self.in_dim is_inference = y is None if is_inference: y_mgc, y_lf0, y_vuv, y_bap = ( None, None, None, None, ) else: # Teacher-forcing outs = split_streams(y, self.stream_sizes) y_mgc, y_lf0, y_vuv, y_bap = outs # Predict continuous log-F0 first if is_inference: lf0, lf0_residual = self.lf0_model.inference(x, lengths), None else: lf0, lf0_residual = self.lf0_model(x, lengths, y_lf0) # Predict spectral parameters if is_inference: mgc_inp = torch.cat([x, lf0], dim=-1) mgc = self.mgc_model.inference(mgc_inp, lengths) else: mgc_inp = torch.cat([x, y_lf0], dim=-1) mgc = self.mgc_model(mgc_inp, lengths, y_mgc) # Predict aperiodic parameters if is_inference: bap_inp = torch.cat([x, lf0], dim=-1) bap = self.bap_model.inference(bap_inp, lengths) else: bap_inp = torch.cat([x, y_lf0], dim=-1) bap = self.bap_model(bap_inp, lengths, y_bap) # Predict V/UV if is_inference: if self.vuv_model_bap0_conditioning: bap_cond = bap[:, :, 0:1] else: bap_cond = bap # full cond: (x, mgc, lf0, bap) vuv_inp = [x] if self.vuv_model_mgc_conditioning: vuv_inp.append(mgc) if self.vuv_model_bap_conditioning: vuv_inp.append(bap_cond) if self.vuv_model_lf0_conditioning: vuv_inp.append(lf0) vuv_inp = torch.cat(vuv_inp, dim=-1) vuv = self.vuv_model.inference(vuv_inp, lengths) else: if self.vuv_model_bap0_conditioning: y_bap_cond = y_bap[:, :, 0:1] else: y_bap_cond = y_bap vuv_inp = [x] if self.vuv_model_mgc_conditioning: vuv_inp.append(y_mgc) if self.vuv_model_bap_conditioning: vuv_inp.append(y_bap_cond) if self.vuv_model_lf0_conditioning: vuv_inp.append(y_lf0) vuv_inp = torch.cat(vuv_inp, dim=-1) vuv = self.vuv_model(vuv_inp, lengths, y_vuv) # make a concatenated stream has_postnet_output = ( isinstance(mgc, list) or isinstance(bap, list) or isinstance(vuv, list) ) if has_postnet_output: outs = [] for idx in range(len(mgc)): mgc_ = mgc[idx] if isinstance(mgc, list) else mgc lf0_ = lf0[idx] if isinstance(lf0, list) else lf0 vuv_ = vuv[idx] if isinstance(vuv, list) else vuv bap_ = bap[idx] if isinstance(bap, list) else bap out = torch.cat([mgc_, lf0_, vuv_, bap_], dim=-1) assert out.shape[-1] == self.out_dim outs.append(out) else: outs = torch.cat([mgc, lf0, vuv, bap], dim=-1) assert outs.shape[-1] == self.out_dim return outs, lf0_residual def inference(self, x, lengths=None): return pad_inference( model=self, x=x, lengths=lengths, reduction_factor=self.reduction_factor, mdn=False, )
[docs] class NPSSMDNMultistreamParametricModel(BaseModel): """NPSS-like cascaded multi-stream parametric model with mixture density networks. .. note:: This class was originally designed to be used with MDNs. However, the internal design was changed to make it work with non-MDN and diffusion models. For example, you can use non-MDN models for MGC prediction. NPSS: :cite:t:`blaauw2017neural` acoustic features: [MGC, LF0, VUV, BAP] Conditional dependency: p(MGC, LF0, VUV, BAP |C) = p(LF0|C) p(MGC|LF0, C) p(BAP|LF0, C) p(VUV|LF0, BAP, C) Args: in_dim (int): Input dimension. out_dim (int): Output dimension. stream_sizes (list): List of stream sizes. lf0_model (BaseModel): Model for predicting log-F0. mgc_model (BaseModel): Model for predicting MGC. bap_model (BaseModel): Model for predicting BAP. vuv_model (BaseModel): Model for predicting V/UV. in_rest_idx (int): Index of the rest symbol in the input features. in_lf0_idx (int): index of lf0 in input features in_lf0_min (float): minimum value of lf0 in the training data of input features in_lf0_max (float): maximum value of lf0 in the training data of input features out_lf0_idx (int): index of lf0 in output features. Typically 180. out_lf0_mean (float): mean of lf0 in the training data of output features out_lf0_scale (float): scale of lf0 in the training data of output features vuv_model_bap_conditioning (bool): If True, use BAP features for V/UV prediction. vuv_model_bap0_conditioning (bool): If True, use only 0-th coef. of BAP for V/UV prediction. vuv_model_lf0_conditioning (bool): If True, use log-F0 features for V/UV prediction. vuv_model_mgc_conditioning (bool): If True, use MGC features for V/UV prediction. """ def __init__( self, in_dim: int, out_dim: int, stream_sizes: list, reduction_factor: int, lf0_model: nn.Module, mgc_model: nn.Module, bap_model: nn.Module, vuv_model: nn.Module, # NOTE: you must carefully set the following parameters in_rest_idx=0, in_lf0_idx=51, in_lf0_min=5.3936276, in_lf0_max=6.491111, out_lf0_idx=60, out_lf0_mean=5.953093881972361, out_lf0_scale=0.23435173188961034, vuv_model_bap_conditioning=True, vuv_model_bap0_conditioning=False, vuv_model_lf0_conditioning=True, vuv_model_mgc_conditioning=False, ): super().__init__() self.in_dim = in_dim self.out_dim = out_dim self.stream_sizes = stream_sizes self.reduction_factor = reduction_factor self.vuv_model_bap_conditioning = vuv_model_bap_conditioning self.vuv_model_bap0_conditioning = vuv_model_bap0_conditioning self.vuv_model_lf0_conditioning = vuv_model_lf0_conditioning self.vuv_model_mgc_conditioning = vuv_model_mgc_conditioning assert len(stream_sizes) in [4] self.lf0_model = lf0_model self.mgc_model = mgc_model self.bap_model = bap_model self.vuv_model = vuv_model self.in_rest_idx = in_rest_idx self.in_lf0_idx = in_lf0_idx self.in_lf0_min = in_lf0_min self.in_lf0_max = in_lf0_max self.out_lf0_idx = out_lf0_idx self.out_lf0_mean = out_lf0_mean self.out_lf0_scale = out_lf0_scale def _set_lf0_params(self): # Special care for residual F0 prediction models # NOTE: don't overwrite out_lf0_idx and in_lf0_idx if hasattr(self.lf0_model, "out_lf0_mean"): self.lf0_model.in_lf0_min = self.in_lf0_min self.lf0_model.in_lf0_max = self.in_lf0_max self.lf0_model.out_lf0_mean = self.out_lf0_mean self.lf0_model.out_lf0_scale = self.out_lf0_scale def prediction_type(self): return PredictionType.MULTISTREAM_HYBRID def is_autoregressive(self): return ( self.mgc_model.is_autoregressive() or self.lf0_model.is_autoregressive() or self.vuv_model.is_autoregressive() or self.bap_model.is_autoregressive() ) def has_residual_lf0_prediction(self): return True def forward(self, x, lengths=None, y=None): self._set_lf0_params() assert x.shape[-1] == self.in_dim is_inference = y is None if is_inference: y_mgc, y_lf0, y_vuv, y_bap = ( None, None, None, None, ) else: # Teacher-forcing outs = split_streams(y, self.stream_sizes) y_mgc, y_lf0, y_vuv, y_bap = outs # Predict continuous log-F0 first if is_inference: lf0, lf0_residual = self.lf0_model.inference(x, lengths), None if self.lf0_model.prediction_type() == PredictionType.PROBABILISTIC: lf0_cond = lf0[0] else: lf0_cond = lf0 else: lf0, lf0_residual = self.lf0_model(x, lengths, y_lf0) # Predict spectral parameters if is_inference: mgc_inp = torch.cat([x, lf0_cond], dim=-1) mgc = self.mgc_model.inference(mgc_inp, lengths) else: mgc_inp = torch.cat([x, y_lf0], dim=-1) mgc = self.mgc_model(mgc_inp, lengths, y_mgc) # Predict aperiodic parameters if is_inference: bap_inp = torch.cat([x, lf0_cond], dim=-1) bap = self.bap_model.inference(bap_inp, lengths) else: bap_inp = torch.cat([x, y_lf0], dim=-1) bap = self.bap_model(bap_inp, lengths, y_bap) # Predict V/UV if is_inference: if self.bap_model.prediction_type() == PredictionType.PROBABILISTIC: bap_cond = bap[0] else: bap_cond = bap if self.mgc_model.prediction_type() == PredictionType.PROBABILISTIC: mgc_cond = mgc[0] else: mgc_cond = mgc if self.vuv_model_bap0_conditioning: bap_cond = bap_cond[:, :, 0:1] # full cond: (x, mgc, lf0, bap) vuv_inp = [x] if self.vuv_model_mgc_conditioning: vuv_inp.append(mgc_cond) if self.vuv_model_lf0_conditioning: vuv_inp.append(lf0_cond) if self.vuv_model_bap_conditioning: vuv_inp.append(bap_cond) vuv_inp = torch.cat(vuv_inp, dim=-1) vuv = self.vuv_model.inference(vuv_inp, lengths) else: if self.vuv_model_bap0_conditioning: y_bap_cond = y_bap[:, :, 0:1] else: y_bap_cond = y_bap vuv_inp = [x] if self.vuv_model_mgc_conditioning: vuv_inp.append(y_mgc) if self.vuv_model_lf0_conditioning: vuv_inp.append(y_lf0) if self.vuv_model_bap_conditioning: vuv_inp.append(y_bap_cond) vuv_inp = torch.cat(vuv_inp, dim=-1) vuv = self.vuv_model(vuv_inp, lengths, y_vuv) if is_inference: if self.lf0_model.prediction_type() == PredictionType.PROBABILISTIC: lf0_ = lf0[0] else: lf0_ = lf0 if self.bap_model.prediction_type() == PredictionType.PROBABILISTIC: bap_ = bap[0] else: bap_ = bap if self.mgc_model.prediction_type() == PredictionType.PROBABILISTIC: mgc_ = mgc[0] else: mgc_ = mgc out = torch.cat([mgc_, lf0_, vuv, bap_], dim=-1) assert out.shape[-1] == self.out_dim # TODO: better design return out, out else: return (mgc, lf0, vuv, bap), lf0_residual def inference(self, x, lengths=None): return pad_inference( model=self, x=x, lengths=lengths, reduction_factor=self.reduction_factor, mdn=True, )
[docs] class MultistreamSeparateF0MelModel(BaseModel): """Multi-stream model with a separate F0 prediction model (mel-version) Conditional dependency: p(MEL, LF0, VUV|C) = p(LF0|C) p(MEL|LF0, C) p(VUV|LF0, C) Args: in_dim (int): Input dimension. out_dim (int): Output dimension. stream_sizes (list): List of stream sizes. reduction_factor (int): Reduction factor. encoder (nn.Module): A shared encoder. mel_model (nn.Module): MEL prediction model. lf0_model (nn.Module): log-F0 prediction model. vuv_model (nn.Module): V/UV prediction model. in_rest_idx (int): Index of the rest symbol in the input features. in_lf0_idx (int): index of lf0 in input features in_lf0_min (float): minimum value of lf0 in the training data of input features in_lf0_max (float): maximum value of lf0 in the training data of input features out_lf0_idx (int): index of lf0 in output features. Typically 180. out_lf0_mean (float): mean of lf0 in the training data of output features out_lf0_scale (float): scale of lf0 in the training data of output features """ def __init__( self, in_dim: int, out_dim: int, stream_sizes: list, reduction_factor: int, encoder: nn.Module, mel_model: nn.Module, lf0_model: nn.Module, vuv_model: nn.Module, # NOTE: you must carefully set the following parameters in_rest_idx=1, in_lf0_idx=300, in_lf0_min=5.3936276, in_lf0_max=6.491111, out_lf0_idx=180, out_lf0_mean=5.953093881972361, out_lf0_scale=0.23435173188961034, ): super().__init__() self.in_dim = in_dim self.out_dim = out_dim self.stream_sizes = stream_sizes self.reduction_factor = reduction_factor assert len(stream_sizes) == 3 self.encoder = encoder if self.encoder is not None: assert not encoder.is_autoregressive() self.mel_model = mel_model self.lf0_model = lf0_model self.vuv_model = vuv_model self.in_rest_idx = in_rest_idx self.in_lf0_idx = in_lf0_idx self.in_lf0_min = in_lf0_min self.in_lf0_max = in_lf0_max self.out_lf0_idx = out_lf0_idx self.out_lf0_mean = out_lf0_mean self.out_lf0_scale = out_lf0_scale def _set_lf0_params(self): # Special care for residual F0 prediction models # NOTE: don't overwrite out_lf0_idx and in_lf0_idx if hasattr(self.lf0_model, "out_lf0_mean"): self.lf0_model.in_lf0_min = self.in_lf0_min self.lf0_model.in_lf0_max = self.in_lf0_max self.lf0_model.out_lf0_mean = self.out_lf0_mean self.lf0_model.out_lf0_scale = self.out_lf0_scale def is_autoregressive(self): return ( self.mel_model.is_autoregressive() or self.lf0_model.is_autoregressive() or self.vuv_model.is_autoregressive() ) def has_residual_lf0_prediction(self): return True def forward(self, x, lengths=None, y=None): self._set_lf0_params() assert x.shape[-1] == self.in_dim if y is not None: # Teacher-forcing outs = split_streams(y, self.stream_sizes) y_mel, y_lf0, y_vuv = outs else: # Inference y_mel, y_lf0, y_vuv = ( None, None, None, ) # Predict continuous log-F0 first lf0, lf0_residual = self.lf0_model(x, lengths, y_lf0) if self.encoder is not None: encoder_outs = self.encoder(x, lengths) # Concat log-F0, rest flags and the outputs of the encoder # This may make the decoder to be aware of the input F0 rest_flags = x[:, :, self.in_rest_idx].unsqueeze(-1) if y is not None: encoder_outs = torch.cat([encoder_outs, rest_flags, y_lf0], dim=-1) else: encoder_outs = torch.cat([encoder_outs, rest_flags, lf0], dim=-1) else: encoder_outs = x # Decoders for each stream mel = self.mel_model(encoder_outs, lengths, y_mel) vuv = self.vuv_model(encoder_outs, lengths, y_vuv) # make a concatenated stream has_postnet_output = ( isinstance(mel, list) or isinstance(lf0, list) or isinstance(vuv, list) ) if has_postnet_output: outs = [] for idx in range(len(mel)): mel_ = mel[idx] if isinstance(mel, list) else mel lf0_ = lf0[idx] if isinstance(lf0, list) else lf0 vuv_ = vuv[idx] if isinstance(vuv, list) else vuv out = torch.cat([mel_, lf0_, vuv_], dim=-1) assert out.shape[-1] == self.out_dim outs.append(out) return outs, lf0_residual else: out = torch.cat( [ mel, lf0, vuv, ], dim=-1, ) assert out.shape[-1] == self.out_dim return out, lf0_residual def inference(self, x, lengths=None): return pad_inference( model=self, x=x, lengths=lengths, reduction_factor=self.reduction_factor )
[docs] class MDNMultistreamSeparateF0MelModel(BaseModel): """Multi-stream model with a separate F0 model (mel-version) and mDN V/UV prediction is performed given a mel-spectrogram. Conditional dependency: p(MEL, LF0, VUV|C) = p(LF0|C) p(MEL|LF0, C) p(VUV|LF0, MEL, C) .. note:: This class was originally designed to be used with MDNs. However, the internal design was changed to make it work with non-MDN and diffusion models. For example, you can use non-MDN models for mel prediction. Args: in_dim (int): Input dimension. out_dim (int): Output dimension. stream_sizes (list): List of stream sizes. reduction_factor (int): Reduction factor. encoder (nn.Module): A shared encoder. mel_model (nn.Module): MEL prediction model. lf0_model (nn.Module): log-F0 prediction model. vuv_model (nn.Module): V/UV prediction model. in_rest_idx (int): Index of the rest symbol in the input features. in_lf0_idx (int): index of lf0 in input features in_lf0_min (float): minimum value of lf0 in the training data of input features in_lf0_max (float): maximum value of lf0 in the training data of input features out_lf0_idx (int): index of lf0 in output features. Typically 180. out_lf0_mean (float): mean of lf0 in the training data of output features out_lf0_scale (float): scale of lf0 in the training data of output features vuv_model_lf0_conditioning (bool): If True, use log-F0 features for V/UV prediction. vuv_model_mel_conditioning (bool): If True, use mel features for V/UV prediction. """ def __init__( self, in_dim: int, out_dim: int, stream_sizes: list, reduction_factor: int, mel_model: nn.Module, lf0_model: nn.Module, vuv_model: nn.Module, # NOTE: you must carefully set the following parameters in_rest_idx=0, in_lf0_idx=51, in_lf0_min=5.3936276, in_lf0_max=6.491111, out_lf0_idx=60, out_lf0_mean=5.953093881972361, out_lf0_scale=0.23435173188961034, vuv_model_lf0_conditioning=True, vuv_model_mel_conditioning=True, ): super().__init__() self.in_dim = in_dim self.out_dim = out_dim self.stream_sizes = stream_sizes self.reduction_factor = reduction_factor self.vuv_model_lf0_conditioning = vuv_model_lf0_conditioning self.vuv_model_mel_conditioning = vuv_model_mel_conditioning assert len(stream_sizes) in [3] self.mel_model = mel_model self.lf0_model = lf0_model self.vuv_model = vuv_model self.in_rest_idx = in_rest_idx self.in_lf0_idx = in_lf0_idx self.in_lf0_min = in_lf0_min self.in_lf0_max = in_lf0_max self.out_lf0_idx = out_lf0_idx self.out_lf0_mean = out_lf0_mean self.out_lf0_scale = out_lf0_scale def _set_lf0_params(self): # Special care for residual F0 prediction models # NOTE: don't overwrite out_lf0_idx and in_lf0_idx if hasattr(self.lf0_model, "out_lf0_mean"): self.lf0_model.in_lf0_min = self.in_lf0_min self.lf0_model.in_lf0_max = self.in_lf0_max self.lf0_model.out_lf0_mean = self.out_lf0_mean self.lf0_model.out_lf0_scale = self.out_lf0_scale def prediction_type(self): return PredictionType.MULTISTREAM_HYBRID def is_autoregressive(self): return ( self.mel_model.is_autoregressive() or self.lf0_model.is_autoregressive() or self.vuv_model.is_autoregressive() ) def has_residual_lf0_prediction(self): return True def forward(self, x, lengths=None, y=None): self._set_lf0_params() assert x.shape[-1] == self.in_dim is_inference = y is None if y is not None: # Teacher-forcing outs = split_streams(y, self.stream_sizes) y_mel, y_lf0, y_vuv = outs else: # Inference y_mel, y_lf0, y_vuv = ( None, None, None, ) # Predict continuous log-F0 first if is_inference: lf0, lf0_residual = self.lf0_model.inference(x, lengths), None if self.lf0_model.prediction_type() == PredictionType.PROBABILISTIC: lf0_cond = lf0[0] else: lf0_cond = lf0 else: lf0, lf0_residual = self.lf0_model(x, lengths, y_lf0) # Predict mel if is_inference: mel_inp = torch.cat([x, lf0_cond], dim=-1) mel = self.mel_model.inference(mel_inp, lengths) else: mel_inp = torch.cat([x, y_lf0], dim=-1) mel = self.mel_model(mel_inp, lengths, y_mel) # Predict V/UV if is_inference: if self.mel_model.prediction_type() == PredictionType.PROBABILISTIC: mel_cond = mel[0] else: mel_cond = mel # full cond: (x, lf0, mel) vuv_inp = [x] if self.vuv_model_lf0_conditioning: vuv_inp.append(lf0_cond) if self.vuv_model_mel_conditioning: vuv_inp.append(mel_cond) vuv_inp = torch.cat(vuv_inp, dim=-1) vuv = self.vuv_model.inference(vuv_inp, lengths) else: vuv_inp = [x] if self.vuv_model_lf0_conditioning: vuv_inp.append(y_lf0) if self.vuv_model_mel_conditioning: vuv_inp.append(y_mel) vuv_inp = torch.cat(vuv_inp, dim=-1) vuv = self.vuv_model(vuv_inp, lengths, y_vuv) if is_inference: if self.lf0_model.prediction_type() == PredictionType.PROBABILISTIC: lf0_ = lf0[0] else: lf0_ = lf0 if self.mel_model.prediction_type() == PredictionType.PROBABILISTIC: mel_ = mel[0] else: mel_ = mel out = torch.cat([mel_, lf0_, vuv], dim=-1) assert out.shape[-1] == self.out_dim # TODO: better design return out, out else: return (mel, lf0, vuv), lf0_residual def inference(self, x, lengths=None): return pad_inference( model=self, x=x, lengths=lengths, reduction_factor=self.reduction_factor, mdn=True, )