#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from typing import List, Dict, Any, Callable, Optional
from functools import partial
from paddle.optimizer import Optimizer
import paddle.nn.functional as F
import numpy as np
import paddle
from paddlets.models.forecasting.dl.paddle_base_impl import PaddleBaseModelImpl
from paddlets.models.common.callbacks import Callback
from paddlets.logger import raise_if_not, Logger
from paddlets.datasets import TSDataset
logger = Logger(__name__)
PAST_TARGET = "past_target"
class _TemporalBlock(paddle.nn.Layer):
"""Paddle layer implementing a residual block.
Args:
in_channels(int): The number of channels in the input.
out_channels(int): The number of filter. It is as same as the output feature map.
kernel_size(int): The filter size.
dilation(int): The dilation size.
dropout_rate(float): Probability of setting units to zero.
Attributes:
_conv1(paddle.nn.Layer): 1D convolution Layer.
_conv2(paddle.nn.Layer): 1D convolution Layer.
_downsample(paddle.nn.Layer): 1D convolution Layer.
_dropout(paddle.nn.Layer): Probability of setting units to zero.
_padding(int): The size of zeros to be padded.
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
dilation: int,
dropout_rate: float,
):
super(_TemporalBlock, self).__init__()
self._conv1 = paddle.nn.Conv1D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
dilation=dilation
)
self._conv2 = paddle.nn.Conv1D(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=kernel_size,
dilation=dilation
)
self._downsample = paddle.nn.Conv1D(
in_channels, out_channels, 1
) if in_channels != out_channels else None
self._conv1 = paddle.nn.utils.weight_norm(self._conv1)
self._conv2 = paddle.nn.utils.weight_norm(self._conv2)
self._dropout = paddle.nn.Dropout(dropout_rate)
self._padding = dilation * (kernel_size - 1)
def forward(
self,
X: paddle.Tensor
) -> paddle.Tensor:
"""Forward.
Args:
X(paddle.Tensor): Feature tensor.
Returns:
paddle.Tensor: Output of model
"""
# In order to deal with the dimension mismatch during residual addition,
# use upsampling or downsampling to ensure that the input channel and output channel dimensions match.
residual = (
self._downsample(X) if self._downsample else X
)
# TCN is based on two principles:
# 1> The convolution network produces an output of the same length as the input (by padding the input).
# 2> No future information leakage.
# The pad layer is used to pad data to ensure that future information is not used.
out = F.pad(X, (self._padding, 0), data_format="NCL")
out = F.relu(self._conv1(out))
out = self._dropout(out)
out = F.pad(out, (self._padding, 0), data_format="NCL")
out = F.relu(self._conv2(out))
out = self._dropout(out)
out = out + residual
return out
class _TCNModule(paddle.nn.Layer):
"""Paddle layer implementing TCN module.
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
target_dim(int): The numer of targets.
hidden_config(List[int]): The config of channels.
kernel_size(int): The filter size.
dropout_rate(float): Probability of setting units to zero.
Attrubutes:
_temporal_layers(paddle.nn.LayerList): Dynamic graph LayerList.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
target_dim: int,
hidden_config: List[int],
kernel_size: int,
dropout_rate: float,
):
super(_TCNModule, self).__init__()
self._out_chunk_len = out_chunk_len
raise_if_not(
1 < kernel_size <= in_chunk_len,
f"The valid range of `kernel_size` is (1, in_chunk_len], " \
f"got kernel_size:{kernel_size} <= 1 or kernel_size:{kernel_size} > in_chunk_len:{in_chunk_len}."
)
raise_if_not(
out_chunk_len <= in_chunk_len,
f"The `out_chunk_len` must be <= `in_chunk_len`, "
f"got out_chunk_len:{out_chunk_len} > in_chunk_len:{in_chunk_len}."
)
if hidden_config is None:
# If hidden_config is not passed, compute number of layers needed for full history coverage.
num_layers = np.ceil(
np.log2((in_chunk_len - 1) / (kernel_size - 1) / 2 + 1)
)
hidden_config = [target_dim] * (int(num_layers) - 1)
else:
# If hidden_config is passed, compute the receptive field.
num_layers = len(hidden_config) + 1
receptive_filed = 1 + 2 * (kernel_size - 1) * (2 ** num_layers - 1)
if receptive_filed > in_chunk_len:
logger.warning("The receptive field of TCN exceeds the in_chunk_len.")
raise_if_not(
np.any(np.array(hidden_config) > 0),
f"hidden_config must be > 0, got {hidden_config}."
)
channels, temporal_layers = [target_dim] + hidden_config + [target_dim], []
for k, (in_channel, out_channel) in \
enumerate(zip(channels[:-1], channels[1:])):
temporal_layer = _TemporalBlock(
in_channels=in_channel,
out_channels=out_channel,
kernel_size=kernel_size,
dilation=(2 ** k),
dropout_rate=dropout_rate,
)
temporal_layers.append(temporal_layer)
self._temporal_layers = paddle.nn.Sequential(*temporal_layers)
def forward(
self,
X: Dict[str, paddle.Tensor]
) -> paddle.Tensor:
"""Forward.
Args:
X(Dict[str, paddle.Tensor]): Dict of feature tensor.
Returns:
paddle.Tensor: Output of model
"""
out = X[PAST_TARGET]
out = paddle.transpose(out, perm=[0, 2, 1])
out = self._temporal_layers(out)
out = paddle.transpose(out, perm=[0, 2, 1])
out = out[:, -self._out_chunk_len:, :]
return out
[docs]class TCNRegressor(PaddleBaseModelImpl):
"""Temporal Convolution Net\[1\].
\[1\] Bai S, et al. "An empirical evaluation of generic convolutional and recurrent networks for sequence modeling",
`<https://arxiv.org/pdf/1803.01271>`_
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default it will NOT skip any time steps.
sampling_stride(int): Sampling intervals between two adjacent samples.
loss_fn(Callable[..., paddle.Tensor]): Loss function.
optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
optimizer_params(Dict[str, Any]): Optimizer parameters.
eval_metrics(List[str]): Evaluation metrics of model.
callbacks(List[Callback]): Customized callback functions.
batch_size(int): Number of samples per batch.
max_epochs(int): Max epochs during training.
verbose(int): Verbosity mode.
patience(int): Number of epochs to wait for improvement before terminating.
seed(int|None): Global random seed.
hidden_config(List[int]|None): Hidden layer configuration.
kernel_size(int): The filter size.
dropout_rate(float): Probability of setting units to zero.
Attributes:
_in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
_out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
_skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default it will NOT skip any time steps.
_sampling_stride(int): Sampling intervals between two adjacent samples.
_loss_fn(Callable[..., paddle.Tensor]): Loss function.
_optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
_optimizer_params(Dict[str, Any]): Optimizer parameters.
_eval_metrics(List[str]): Evaluation metrics of model.
_callbacks(List[Callback]): Customized callback functions.
_batch_size(int): Number of samples per batch.
_max_epochs(int): Max epochs during training.
_verbose(int): Verbosity mode.
_patience(int): Number of epochs to wait for improvement before terminating.
_seed(int|None): Global random seed.
_stop_training(bool) Training status.
_hidden_config(List[int]|None): Hidden layer configuration.
_kernel_size(int): The filter size.
_dropout_rate(float): Probability of setting units to zero.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
skip_chunk_len: int = 0,
sampling_stride: int = 1,
loss_fn: Callable[..., paddle.Tensor] = F.mse_loss,
optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam,
optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3),
eval_metrics: List[str] = [],
callbacks: List[Callback] = [],
batch_size: int = 32,
max_epochs: int = 100,
verbose: int = 1,
patience: int = 10,
seed: Optional[int] = None,
hidden_config: List[int] = None,
kernel_size: int = 3,
dropout_rate: float = 0.2,
):
self._hidden_config = hidden_config
self._kernel_size = kernel_size
self._dropout_rate = dropout_rate
super(TCNRegressor, self).__init__(
in_chunk_len=in_chunk_len,
out_chunk_len=out_chunk_len,
skip_chunk_len=skip_chunk_len,
sampling_stride=sampling_stride,
loss_fn=loss_fn,
optimizer_fn=optimizer_fn,
optimizer_params=optimizer_params,
eval_metrics=eval_metrics,
callbacks=callbacks,
batch_size=batch_size,
max_epochs=max_epochs,
verbose=verbose,
patience=patience,
seed=seed,
)
def _check_tsdataset(
self,
tsdataset: TSDataset
):
"""Ensure the robustness of input data (consistent feature order), at the same time,
check whether the data types are compatible. If not, the processing logic is as follows:
1> Integer: Convert to np.int64.
2> Floating: Convert to np.float32.
3> Missing value: Warning.
4> Other: Illegal.
Args:
tsdataset(TSDataset): Data to be checked.
"""
target_columns = tsdataset.get_target().dtypes.keys()
for column, dtype in tsdataset.dtypes.items():
if column in target_columns:
raise_if_not(
np.issubdtype(dtype, np.floating),
f"tcn's target dtype only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
continue
raise_if_not(
np.issubdtype(dtype, np.floating),
f"tcn's cov(observed or known) dtype currently only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
super(TCNRegressor, self)._check_tsdataset(tsdataset)
def _update_fit_params(
self,
train_tsdataset: List[TSDataset],
valid_tsdataset: Optional[List[TSDataset]] = None
) -> Dict[str, Any]:
"""Infer parameters by TSdataset automatically.
Args:
train_tsdataset(List[TSDataset]): list of train dataset.
valid_tsdataset(List[TSDataset]|None): list of validation dataset.
Returns:
Dict[str, Any]: model parameters
"""
target_dim = train_tsdataset[0].get_target().data.shape[1]
fit_params = {
"target_dim": target_dim
}
return fit_params
def _init_network(self) -> paddle.nn.Layer:
"""Setup the network.
Returns:
paddle.nn.Layer.
"""
return _TCNModule(
in_chunk_len=self._in_chunk_len,
out_chunk_len=self._out_chunk_len,
target_dim=self._fit_params["target_dim"],
hidden_config=self._hidden_config,
kernel_size=self._kernel_size,
dropout_rate=self._dropout_rate,
)