import numpy as np
import torch
from nnsvs.base import BaseModel
from nnsvs.multistream import split_streams
from nnsvs.util import init_weights
from torch import nn
[docs]
def variance_scaling(gv, feats, offset=2, note_frame_indices=None):
"""Variance scaling method to enhance synthetic speech quality
Method proposed in :cite:t:`silen2012ways`.
Args:
gv (tensor): global variance computed over training data
feats (tensor): input features
offset (int): offset
note_frame_indices (tensor): indices of note frames
Returns:
tensor: scaled features
"""
if note_frame_indices is not None:
if len(note_frame_indices) == 0:
return feats
utt_gv = feats[note_frame_indices].var(0)
utt_mu = feats[note_frame_indices].mean(0)
else:
utt_gv = feats.var(0)
utt_mu = feats.mean(0)
out = feats.copy()
if note_frame_indices is not None:
out[note_frame_indices, offset:] = (
np.sqrt(gv[offset:] / utt_gv[offset:])
* (feats[note_frame_indices, offset:] - utt_mu[offset:])
+ utt_mu[offset:]
)
else:
out[:, offset:] = (
np.sqrt(gv[offset:] / utt_gv[offset:])
* (feats[:, offset:] - utt_mu[offset:])
+ utt_mu[offset:]
)
return out
class MovingAverage1d(nn.Conv1d):
"""Moving average filter on 1-D signals
Args:
in_channels (int): number of input channels
out_channels (int): number of output channels
kernel_size (int): kernel size
padding_mode (str): padding mode
"""
def __init__(self, in_channels, out_channels, kernel_size, padding_mode="reflect"):
# NOTE: process each channel independently by setting groups=in_channels
super().__init__(
in_channels,
out_channels,
kernel_size,
groups=in_channels,
bias=False,
padding="same",
padding_mode=padding_mode,
)
nn.init.constant_(self.weight, 1 / kernel_size)
for p in self.parameters():
p.requires_grad = False
[docs]
class Conv2dPostFilter(BaseModel):
"""A post-filter based on Conv2d
A model proposed in :cite:t:`kaneko2017generative`.
Args:
channels (int): number of channels
kernel_size (tuple): kernel sizes for Conv2d
init_type (str): type of initialization
noise_scale (float): scale of noise
noise_type (str): type of noise. "frame_wise" or "bin_wise"
padding_mode (str): padding mode
smoothing_width (int): Width of smoothing window.
The larger the smoother. Only used at inference time.
"""
def __init__(
self,
in_dim=None,
channels=128,
kernel_size=(5, 5),
init_type="kaiming_normal",
noise_scale=1.0,
noise_type="bin_wise",
padding_mode="zeros",
smoothing_width=-1,
):
super().__init__()
self.in_dim = in_dim
self.noise_type = noise_type
self.noise_scale = noise_scale
C = channels
self.smoothing_width = smoothing_width
assert len(kernel_size) == 2
ks = np.asarray(list(kernel_size))
padding = (ks - 1) // 2
self.conv1 = nn.Sequential(
nn.Conv2d(
2,
C,
kernel_size=ks,
padding=padding,
padding_mode=padding_mode,
),
nn.ReLU(),
)
self.conv2 = nn.Sequential(
nn.Conv2d(
C + 1, C * 2, kernel_size=ks, padding=padding, padding_mode=padding_mode
),
nn.ReLU(),
)
self.conv3 = nn.Sequential(
nn.Conv2d(
C * 2 + 1, C, kernel_size=ks, padding=padding, padding_mode=padding_mode
),
nn.ReLU(),
)
self.conv4 = nn.Conv2d(
C + 1, 1, kernel_size=ks, padding=padding, padding_mode=padding_mode
)
if self.noise_type == "frame_wise":
# noise: (B, T, 1)
self.fc = nn.Linear(1, in_dim)
elif self.noise_type == "bin_wise":
# noise: (B, T, C)
self.fc = None
else:
raise ValueError("Unknown noise type: {}".format(self.noise_type))
init_weights(self, init_type)
def forward(self, x, lengths=None, y=None, is_inference=False):
"""Forward step
Args:
x (torch.Tensor): input tensor of shape (B, T, C)
lengths (torch.Tensor): lengths of shape (B,)
Returns:
torch.Tensor: output tensor of shape (B, T, C)
"""
# (B, T, C) -> (B, 1, T, C):
x = x.unsqueeze(1)
if self.noise_type == "bin_wise":
# (B, C, T)
z = torch.randn_like(x).squeeze(1).transpose(1, 2) * self.noise_scale
# Apply moving average filter at inference time only
if is_inference and self.smoothing_width > 0:
ave_filt = MovingAverage1d(
self.in_dim, self.in_dim, self.smoothing_width
).to(x.device)
z = ave_filt(z)
# (B, 1, T, C)
z = z.transpose(1, 2).unsqueeze(1)
elif self.noise_type == "frame_wise":
# (B, 1, T)
z = torch.randn(x.shape[0], 1, x.shape[2]).to(x.device) * self.noise_scale
# Apply moving average filter at inference time only
if is_inference and self.smoothing_width > 0:
ave_filt = MovingAverage1d(1, 1, self.smoothing_width).to(x.device)
z = ave_filt(z)
# (B, 1, T, 1)
z = z.unsqueeze(-1)
# (B, 1, T, C)
z = self.fc(z)
x_syn = x
y = self.conv1(torch.cat([x_syn, z], dim=1))
y = self.conv2(torch.cat([x_syn, y], dim=1))
y = self.conv3(torch.cat([x_syn, y], dim=1))
residual = self.conv4(torch.cat([x_syn, y], dim=1))
out = x_syn + residual
# (B, 1, T, C) -> (B, T, C)
out = out.squeeze(1)
return out
def inference(self, x, lengths=None):
return self(x, lengths, is_inference=True)
[docs]
class MultistreamPostFilter(BaseModel):
"""A multi-stream post-filter that applies post-filtering for each feature stream
Currently, post-filtering for MGC, BAP and log-F0 are supported.
Note that it doesn't make much sense to apply post-filtering for other features.
Args:
mgc_postfilter (nn.Module): post-filter for MGC
bap_postfilter (nn.Module): post-filter for BAP
lf0_postfilter (nn.Module): post-filter for log-F0
stream_sizes (list): sizes of each feature stream
mgc_offset (int): offset for MGC. Defaults to 2.
bap_offset (int): offset for BAP. Defaults to 0.
"""
def __init__(
self,
mgc_postfilter: nn.Module,
bap_postfilter: nn.Module,
lf0_postfilter: nn.Module,
stream_sizes: list,
mgc_offset: int = 2,
bap_offset: int = 0,
):
super().__init__()
self.mgc_postfilter = mgc_postfilter
self.bap_postfilter = bap_postfilter
self.lf0_postfilter = lf0_postfilter
self.stream_sizes = stream_sizes
self.mgc_offset = mgc_offset
self.bap_offset = bap_offset
def forward(self, x, lengths=None, y=None, is_inference=False):
"""Forward step
Each feature stream is processed independently.
Args:
x (torch.Tensor): input tensor of shape (B, T, C)
lengths (torch.Tensor): lengths of shape (B,)
Returns:
torch.Tensor: output tensor of shape (B, T, C)
"""
streams = split_streams(x, self.stream_sizes)
if len(streams) == 4:
mgc, lf0, vuv, bap = streams
elif len(streams) == 5:
mgc, lf0, vuv, bap, vuv = streams
elif len(streams) == 6:
mgc, lf0, vuv, bap, vib, vib_flags = streams
else:
raise ValueError("Invalid number of streams")
if self.mgc_postfilter is not None:
if self.mgc_offset > 0:
# keep unchanged for the 0-to-${mgc_offset}-th dim of mgc
mgc0 = mgc[:, :, : self.mgc_offset]
if is_inference:
mgc_pf = self.mgc_postfilter.inference(
mgc[:, :, self.mgc_offset :], lengths
)
else:
mgc_pf = self.mgc_postfilter(mgc[:, :, self.mgc_offset :], lengths)
mgc_pf = torch.cat([mgc0, mgc_pf], dim=-1)
else:
if is_inference:
mgc_pf = self.mgc_postfilter.inference(mgc, lengths)
else:
mgc_pf = self.mgc_postfilter(mgc, lengths)
mgc = mgc_pf
if self.bap_postfilter is not None:
if self.bap_offset > 0:
# keep unchanged for the 0-to-${bap_offset}-th dim of bap
bap0 = bap[:, :, : self.bap_offset]
if is_inference:
bap_pf = self.bap_postfilter.inference(
bap[:, :, self.bap_offset :], lengths
)
else:
bap_pf = self.bap_postfilter(bap[:, :, self.bap_offset :], lengths)
bap_pf = torch.cat([bap0, bap_pf], dim=-1)
else:
if is_inference:
bap_pf = self.bap_postfilter.inference(bap, lengths)
else:
bap_pf = self.bap_postfilter(bap, lengths)
bap = bap_pf
if self.lf0_postfilter is not None:
if is_inference:
lf0 = self.lf0_postfilter.inference(lf0, lengths)
else:
lf0 = self.lf0_postfilter(lf0, lengths)
if len(streams) == 4:
out = torch.cat([mgc, lf0, vuv, bap], dim=-1)
elif len(streams) == 5:
out = torch.cat([mgc, lf0, vuv, bap, vib], dim=-1)
elif len(streams) == 6:
out = torch.cat([mgc, lf0, vuv, bap, vib, vib_flags], dim=-1)
return out
def inference(self, x, lengths):
return self(x, lengths, is_inference=True)
class MelF0MultistreamPostFilter(BaseModel):
def __init__(
self,
mel_postfilter: nn.Module,
lf0_postfilter: nn.Module,
stream_sizes: list,
mel_offset: int = 0,
):
super().__init__()
self.mel_postfilter = mel_postfilter
self.lf0_postfilter = lf0_postfilter
self.stream_sizes = stream_sizes
self.mel_offset = mel_offset
def forward(self, x, lengths=None, y=None, is_inference=False):
"""Forward step
Each feature stream is processed independently.
Args:
x (torch.Tensor): input tensor of shape (B, T, C)
lengths (torch.Tensor): lengths of shape (B,)
Returns:
torch.Tensor: output tensor of shape (B, T, C)
"""
streams = split_streams(x, self.stream_sizes)
assert len(streams) == 3
mel, lf0, vuv = streams
if self.mel_postfilter is not None:
if self.mel_offset > 0:
# keep unchanged for the 0-to-${mgc_offset}-th dim of mgc
mel0 = mel[:, :, : self.mel_offset]
if is_inference:
mel_pf = self.mel_postfilter.inference(
mel[:, :, self.mel_offset :], lengths
)
else:
mel_pf = self.mel_postfilter(mel[:, :, self.mel_offset :], lengths)
mel_pf = torch.cat([mel0, mel_pf], dim=-1)
else:
if is_inference:
mel_pf = self.mel_postfilter.inference(mel, lengths)
else:
mel_pf = self.mel_postfilter(mel, lengths)
mel = mel_pf
if self.lf0_postfilter is not None:
if is_inference:
lf0 = self.lf0_postfilter.inference(lf0, lengths)
else:
lf0 = self.lf0_postfilter(lf0, lengths)
out = torch.cat([mel, lf0, vuv], dim=-1)
return out
def inference(self, x, lengths):
return self(x, lengths, is_inference=True)
class _PadConv2dPostFilter(BaseModel):
def __init__(
self,
in_dim=None,
channels=128,
kernel_size=5,
init_type="kaiming_normal",
padding_side="left",
):
super().__init__()
assert not isinstance(kernel_size, list)
C = channels
ks = kernel_size
padding = (ks - 1) // 2
self.padding = padding
# Treat padding for the feature-axis carefully
# use normal padding for the time-axis (i.e., (padding, padding))
self.padding_side = padding_side
if padding_side == "left":
self.pad = nn.ReflectionPad2d((padding, 0, padding, padding))
elif padding_side == "none":
self.pad = nn.ReflectionPad2d((0, 0, padding, padding))
elif padding_side == "right":
self.pad = nn.ReflectionPad2d((0, padding, padding, padding))
else:
raise ValueError("Invalid padding side")
self.conv1 = nn.Sequential(
nn.Conv2d(2, C, kernel_size=(ks, ks)),
nn.ReLU(),
)
# NOTE: for the subsequent layers, use fixed kernel_size 3 for feature-axis
self.conv2 = nn.Sequential(
nn.Conv2d(
C + 1,
C * 2,
kernel_size=(ks, 3),
padding=(padding, 1),
padding_mode="reflect",
),
nn.ReLU(),
)
self.conv3 = nn.Sequential(
nn.Conv2d(
C * 2 + 1,
C,
kernel_size=(ks, 3),
padding=(padding, 1),
padding_mode="reflect",
),
nn.ReLU(),
)
self.conv4 = nn.Conv2d(
C + 1, 1, kernel_size=(ks, 1), padding=(padding, 0), padding_mode="reflect"
)
self.fc = nn.Linear(1, in_dim)
init_weights(self, init_type)
def forward(self, x, z, lengths=None):
# (B, T, C) -> (B, 1, T, C):
x = x.unsqueeze(1)
z = z.unsqueeze(1)
z = self.fc(z)
x_syn = x
y = self.conv1(torch.cat([self.pad(x_syn), self.pad(z)], dim=1))
if self.padding_side == "left":
x_syn = x[:, :, :, : -self.padding]
elif self.padding_side == "none":
x_syn = x[:, :, :, self.padding : -self.padding]
elif self.padding_side == "right":
x_syn = x[:, :, :, self.padding :]
y = self.conv2(torch.cat([x_syn, y], dim=1))
y = self.conv3(torch.cat([x_syn, y], dim=1))
residual = self.conv4(torch.cat([x_syn, y], dim=1))
out = x_syn + residual
# (B, 1, T, C) -> (B, T, C)
out = out.squeeze(1)
return out
class MultistreamConv2dPostFilter(nn.Module):
"""Conv2d-based multi-stream post-filter designed for MGC
Divide the MGC transformation into low/mid/high dim transfomations
with small overlaps. Overlap is determined by the kernel size.
"""
def __init__(
self,
in_dim=None,
channels=128,
kernel_size=5,
init_type="kaiming_normal",
noise_scale=1.0,
stream_sizes=(8, 20, 30),
):
super().__init__()
assert len(stream_sizes) == 3
self.padding = (kernel_size - 1) // 2
self.noise_scale = noise_scale
self.stream_sizes = stream_sizes
self.low_postfilter = _PadConv2dPostFilter(
stream_sizes[0] + self.padding,
channels=channels,
kernel_size=kernel_size,
init_type=init_type,
padding_side="left",
)
self.mid_postfilter = _PadConv2dPostFilter(
stream_sizes[1] + 2 * self.padding,
channels=channels,
kernel_size=kernel_size,
init_type=init_type,
padding_side="none",
)
self.high_postfilter = _PadConv2dPostFilter(
stream_sizes[2] + self.padding,
channels=channels,
kernel_size=kernel_size,
init_type=init_type,
padding_side="right",
)
def forward(self, x, lengths=None, y=None):
assert x.shape[-1] == sum(self.stream_sizes)
# (B, T, C)
z = torch.randn(x.shape[0], x.shape[1], 1).to(x.device) * self.noise_scale
# Process three streams separately with a overlap width of padding
out1 = self.low_postfilter(x[:, :, : self.stream_sizes[0] + self.padding], z)
out2 = self.mid_postfilter(
x[
:,
:,
self.stream_sizes[0]
- self.padding : sum(self.stream_sizes[:2])
+ self.padding,
],
z,
)
out3 = self.high_postfilter(
x[:, :, sum(self.stream_sizes[:2]) - self.padding :], z
)
# Merge the three outputs
out = torch.cat([out1, out2, out3], dim=-1)
return out