Source code for nnsvs.util

import importlib
import random
from os.path import join
from pathlib import Path
from typing import Any

import numpy as np
import pkg_resources
import pyworld
import torch
from hydra.utils import instantiate
from nnsvs.multistream import get_static_features, get_static_stream_sizes
from nnsvs.usfgan import USFGANWrapper
from omegaconf import OmegaConf
from torch import nn

try:
    from parallel_wavegan.utils import load_model

    _pwg_available = True
except ImportError:
    _pwg_available = False


# mask-related functions were adapted from https://github.com/espnet/espnet

EXAMPLE_DIR = "_example_data"


# Adapted from https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix
[docs] def init_weights(net, init_type="normal", init_gain=0.02): """Initialize network weights. Args: net (torch.nn.Module): network to initialize init_type (str): the name of an initialization method: normal | xavier | kaiming | orthogonal | none. init_gain (float): scaling factor for normal, xavier and orthogonal. """ if init_type == "none": return def init_func(m): classname = m.__class__.__name__ if hasattr(m, "weight") and ( classname.find("Conv") != -1 or classname.find("Linear") != -1 ): if init_type == "normal": nn.init.normal_(m.weight.data, 0.0, init_gain) elif init_type == "xavier_normal": nn.init.xavier_normal_(m.weight.data, gain=init_gain) elif init_type == "kaiming_normal": nn.init.kaiming_normal_(m.weight.data, a=0, mode="fan_in") elif init_type == "orthogonal": nn.init.orthogonal_(m.weight.data, gain=init_gain) else: raise NotImplementedError( "initialization method [%s] is not implemented" % init_type ) if hasattr(m, "bias") and m.bias is not None: nn.init.constant_(m.bias.data, 0.0) elif classname.find("BatchNorm2d") != -1: # BatchNorm Layer's weight is not a matrix; only normal distribution applies. nn.init.normal_(m.weight.data, 1.0, init_gain) nn.init.constant_(m.bias.data, 0.0) net.apply(init_func)
[docs] def get_world_stream_info( sr: int, mgc_order: int, num_windows: int = 3, vibrato_mode: str = "none", use_mcep_aperiodicity: bool = False, mcep_aperiodicity_order: int = 24, ): """Get stream sizes for WORLD-based acoustic features Args: sr (int): sampling rate mgc_order (int): order of mel-generalized cepstrum num_windows (int): number of windows vibrato_mode (str): vibrato analysis mode Returns: list: stream sizes """ # [mgc, lf0, vuv, bap] stream_sizes = [ (mgc_order + 1) * num_windows, num_windows, 1, pyworld.get_num_aperiodicities(sr) * num_windows if not use_mcep_aperiodicity else mcep_aperiodicity_order + 1, ] if vibrato_mode == "diff": # vib stream_sizes.append(num_windows) elif vibrato_mode == "sine": # vib + vib_flags stream_sizes.append(3 * num_windows) stream_sizes.append(1) elif vibrato_mode == "none": pass else: raise RuntimeError("Unknown vibrato mode: {}".format(vibrato_mode)) return stream_sizes
[docs] def load_utt_list(utt_list): """Load a list of utterances. Args: utt_list (str): path to a file containing a list of utterances Returns: List[str]: list of utterances """ with open(utt_list) as f: utt_ids = f.readlines() utt_ids = map(lambda utt_id: utt_id.strip(), utt_ids) utt_ids = filter(lambda utt_id: len(utt_id) > 0, utt_ids) return list(utt_ids)
[docs] def example_xml_file(key="haruga_kita"): """Get the path to an included xml file. Args: key (str): key of the file Returns: str: path to an example xml file Raises: FileNotFoundError: if the file is not found """ return pkg_resources.resource_filename(__name__, join(EXAMPLE_DIR, f"{key}.xml"))
[docs] def init_seed(seed): """Initialize random seed. Args: seed (int): random seed """ random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
[docs] def dynamic_import(name: str) -> Any: """Dynamic import Args: name (str): module_name + ":" + class_name Returns: Any: class object """ mod_name, class_name = name.split(":") mod = importlib.import_module(mod_name) return getattr(mod, class_name)
[docs] def pad_2d(x, max_len, constant_values=0): """Pad a 2d-tensor. Args: x (torch.Tensor): tensor to pad max_len (int): maximum length of the tensor constant_values (int, optional): value to pad with. Default: 0 Returns: torch.Tensor: padded tensor """ x = np.pad( x, [(0, max_len - len(x)), (0, 0)], mode="constant", constant_values=constant_values, ) return x
[docs] def make_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None): """Make mask tensor containing indices of padded part. Args: lengths (LongTensor or List): Batch of lengths (B,). xs (Tensor, optional): The reference tensor. If set, masks will be the same shape as this tensor. length_dim (int, optional): Dimension indicator of the above tensor. Returns: Tensor: Mask tensor containing indices of padded part. dtype=torch.uint8 in PyTorch 1.2- dtype=torch.bool in PyTorch 1.2+ (including 1.2) """ if length_dim == 0: raise ValueError("length_dim cannot be 0: {}".format(length_dim)) if not isinstance(lengths, list): lengths = lengths.tolist() bs = int(len(lengths)) if maxlen is None: if xs is None: maxlen = int(max(lengths)) else: maxlen = xs.size(length_dim) seq_range = torch.arange(0, maxlen, dtype=torch.int64) seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen) seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1) mask = seq_range_expand >= seq_length_expand if xs is not None: assert xs.size(0) == bs, (xs.size(0), bs) if length_dim < 0: length_dim = xs.dim() + length_dim # ind = (:, None, ..., None, :, , None, ..., None) ind = tuple( slice(None) if i in (0, length_dim) else None for i in range(xs.dim()) ) mask = mask[ind].expand_as(xs).to(xs.device) return mask
[docs] def make_non_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None): """Make mask tensor containing indices of non-padded part. Args: lengths (LongTensor or List): Batch of lengths (B,). xs (Tensor, optional): The reference tensor. If set, masks will be the same shape as this tensor. length_dim (int, optional): Dimension indicator of the above tensor. Returns: ByteTensor: mask tensor containing indices of padded part. dtype=torch.uint8 in PyTorch 1.2- dtype=torch.bool in PyTorch 1.2+ (including 1.2) """ return ~make_pad_mask(lengths, xs, length_dim, maxlen)
[docs] class PyTorchStandardScaler(nn.Module): """PyTorch module for standardization. Args: mean (torch.Tensor): mean scale (torch.Tensor): scale """ def __init__(self, mean, scale): super().__init__() self.mean_ = nn.Parameter(mean, requires_grad=False) self.scale_ = nn.Parameter(scale, requires_grad=False) def transform(self, x): return (x - self.mean_) / self.scale_ def inverse_transform(self, x): return x * self.scale_ + self.mean_
[docs] class StandardScaler: """sklearn.preprocess.StandardScaler like class with only transform functionality Args: mean (np.ndarray): mean var (np.ndarray): variance scale (np.ndarray): scale """ def __init__(self, mean, var, scale): self.mean_ = mean self.var_ = var # NOTE: scale may not exactly same as np.sqrt(var) self.scale_ = scale def transform(self, x): return (x - self.mean_) / self.scale_ def inverse_transform(self, x): return x * self.scale_ + self.mean_
[docs] class MinMaxScaler: """sklearn.preprocess.MinMaxScaler like class with only transform functionality Args: min (np.ndarray): minimum scale (np.ndarray): scale data_min (np.ndarray): minimum of input data data_max (np.ndarray): maximum of input data feature_range (tuple): (min, max) """ def __init__(self, min, scale, data_min=None, data_max=None, feature_range=(0, 1)): self.min_ = min self.scale_ = scale self.data_min_ = data_min self.data_max_ = data_max self.feature_range = feature_range def transform(self, x): return self.scale_ * x + self.min_ def inverse_transform(self, x): return (x - self.min_) / self.scale_
def extract_static_scaler(out_scaler, model_config): """Extract scaler for static features Args: out_scaler (StandardScaler or MinMaxScaler): target scaler model_config (dict): model config that contain stream information Returns: StandardScaler or MinMaxScaler: scaler for static features """ mean_ = get_static_features( out_scaler.mean_.reshape(1, 1, out_scaler.mean_.shape[-1]), model_config.num_windows, model_config.stream_sizes, model_config.has_dynamic_features, ) mean_ = np.concatenate(mean_, -1).reshape(1, -1) var_ = get_static_features( out_scaler.var_.reshape(1, 1, out_scaler.var_.shape[-1]), model_config.num_windows, model_config.stream_sizes, model_config.has_dynamic_features, ) var_ = np.concatenate(var_, -1).reshape(1, -1) scale_ = get_static_features( out_scaler.scale_.reshape(1, 1, out_scaler.scale_.shape[-1]), model_config.num_windows, model_config.stream_sizes, model_config.has_dynamic_features, ) scale_ = np.concatenate(scale_, -1).reshape(1, -1) static_scaler = StandardScaler(mean_, var_, scale_) return static_scaler def load_vocoder(path, device, acoustic_config): """Load vocoder model from a given checkpoint path Note that the path needs to be a checkpoint of PWG or USFGAN. Args: path (str or Path): Path to the vocoder model device (str): Device to load the model acoustic_config (dict): Acoustic model config Returns: tuple: (vocoder, vocoder_in_scaler, vocoder_config) """ if not _pwg_available: raise RuntimeError( "parallel_wavegan is required to load pre-trained checkpoint." ) path = Path(path) if isinstance(path, str) else path model_dir = path.parent if (model_dir / "vocoder_model.yaml").exists(): # packed model vocoder_config = OmegaConf.load(model_dir / "vocoder_model.yaml") elif (model_dir / "config.yml").exists(): # PWG checkpoint vocoder_config = OmegaConf.load(model_dir / "config.yml") else: # usfgan vocoder_config = OmegaConf.load(model_dir / "config.yaml") if "generator" in vocoder_config and "discriminator" in vocoder_config: # usfgan checkpoint = torch.load( path, map_location=lambda storage, loc: storage, ) vocoder = instantiate(vocoder_config.generator).to(device) vocoder.load_state_dict(checkpoint["model"]["generator"]) vocoder.remove_weight_norm() vocoder = USFGANWrapper(vocoder_config, vocoder) stream_sizes = get_static_stream_sizes( acoustic_config.stream_sizes, acoustic_config.has_dynamic_features, acoustic_config.num_windows, ) # Extract scaler params for [mgc, bap] if vocoder_config.data.aux_feats == ["mcep", "codeap"]: # streams: (mgc, lf0, vuv, bap) mean_ = np.load(model_dir / "in_vocoder_scaler_mean.npy") var_ = np.load(model_dir / "in_vocoder_scaler_var.npy") scale_ = np.load(model_dir / "in_vocoder_scaler_scale.npy") mgc_end_dim = stream_sizes[0] bap_start_dim = sum(stream_sizes[:3]) bap_end_dim = sum(stream_sizes[:4]) vocoder_in_scaler = StandardScaler( np.concatenate([mean_[:mgc_end_dim], mean_[bap_start_dim:bap_end_dim]]), np.concatenate([var_[:mgc_end_dim], var_[bap_start_dim:bap_end_dim]]), np.concatenate( [scale_[:mgc_end_dim], scale_[bap_start_dim:bap_end_dim]] ), ) else: # streams: (mel, lf0, vuv) mel_dim = stream_sizes[0] vocoder_in_scaler = StandardScaler( np.load(model_dir / "in_vocoder_scaler_mean.npy")[:mel_dim], np.load(model_dir / "in_vocoder_scaler_var.npy")[:mel_dim], np.load(model_dir / "in_vocoder_scaler_scale.npy")[:mel_dim], ) else: # PWG vocoder = load_model(path, config=vocoder_config).to(device) vocoder.remove_weight_norm() vocoder_in_scaler = StandardScaler( np.load(model_dir / "in_vocoder_scaler_mean.npy"), np.load(model_dir / "in_vocoder_scaler_var.npy"), np.load(model_dir / "in_vocoder_scaler_scale.npy"), ) vocoder.eval() return vocoder, vocoder_in_scaler, vocoder_config