#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from typing import List, Dict, Any, Callable, Optional
from paddle.optimizer import Optimizer
import paddle.nn.functional as F
import numpy as np
import paddle
from paddlets.models.forecasting.dl.paddle_base_impl import PaddleBaseModelImpl
from paddlets.models.common.callbacks import Callback
from paddlets.logger import raise_if_not, Logger
from paddlets.datasets import TSDataset
PAST_TARGET = "past_target"
class _LSTNetModule(paddle.nn.Layer):
"""Network structure.
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
target_dim(int): The numer of targets.
skip_size(int): Skip size for the skip RNN layer.
channels(int): Number of channels for first layer Conv1D.
kernel_size(int): Kernel size for first layer Conv1D.
rnn_cell_type(str): Type of the RNN cell, Either GRU or LSTM.
rnn_num_cells(int): Number of RNN cells for each layer.
skip_rnn_cell_type(str): Type of the RNN cell for the skip layer, Either GRU or LSTM.
skip_rnn_num_cells(int): Number of RNN cells for each layer for skip part.
dropout_rate(float): Dropout regularization parameter.
output_activation(str|None): The last activation to be used for output.
Accepts either None (default no activation), sigmoid or tanh.
Attrubutes:
_in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
_skip_size(int): Skip size for the skip RNN layer.
_channels(int): Number of channels for first layer Conv1D.
_rnn_num_cells(int): Number of RNN cells for each layer.
_skip_rnn_num_cells(int): Number of RNN cells for each layer for skip part.
_output_activation(str|None): The last activation to be used for output.
Accepts either None (default no activation), sigmoid or tanh.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
target_dim: int,
skip_size: int,
channels: int,
kernel_size: int,
rnn_cell_type: str,
rnn_num_cells: int,
skip_rnn_cell_type: str,
skip_rnn_num_cells: int,
dropout_rate: float,
output_activation: Optional[str] = None,
):
super(_LSTNetModule, self).__init__()
self._in_chunk_len = in_chunk_len
self._channels = channels
self._rnn_num_cells = rnn_num_cells
self._skip_rnn_num_cells = skip_rnn_num_cells
self._skip_size = skip_size
self._output_activation = output_activation
raise_if_not(
channels > 0,
"`channels` must be a positive integer"
)
raise_if_not(
rnn_cell_type in ("GRU", "LSTM"),
"`rnn_cell_type` must be either 'GRU' or 'LSTM'"
)
raise_if_not(
skip_rnn_cell_type in ("GRU", "LSTM"),
"`skip_rnn_cell_type` must be either 'GRU' or 'LSTM'"
)
conv_out = in_chunk_len - kernel_size
self._conv_skip = conv_out // skip_size
raise_if_not(
self._conv_skip > 0,
"conv1d output size must be greater than or equal to `skip_size`\n" \
"Choose a smaller `kernel_size` or bigger `in_chunk_len`"
)
if output_activation is not None:
raise_if_not(
output_activation in ("sigmoid", "tanh"),
"`output_activation` must be either 'sigmoid' or 'tanh'"
)
self._cnn = paddle.nn.Conv1D(target_dim, channels, kernel_size, data_format="NLC")
self._dropout = paddle.nn.Dropout(dropout_rate)
rnn = {"LSTM": paddle.nn.LSTM, "GRU": paddle.nn.GRU}[rnn_cell_type]
self._rnn = rnn(channels, rnn_num_cells)
skip_rnn = {"LSTM": paddle.nn.LSTM, "GRU": paddle.nn.GRU}[skip_rnn_cell_type]
self._skip_rnn = skip_rnn(channels, skip_rnn_num_cells)
self._fc = paddle.nn.Linear(rnn_num_cells + skip_size * skip_rnn_num_cells, target_dim)
self._ar_fc = paddle.nn.Linear(in_chunk_len, out_chunk_len)
def forward(
self,
X: Dict[str, paddle.Tensor]
) -> paddle.Tensor:
"""Forward.
Args:
X(Dict[str, paddle.Tensor]): Dict of feature tensor.
Returns:
paddle.Tensor: Output of model.
"""
# CNN
cnn_out = self._cnn(X[PAST_TARGET]) # [B, T, C]
cnn_out = F.relu(cnn_out)
cnn_out = self._dropout(cnn_out)
# RNN
_, rnn_out = self._rnn(cnn_out)
rnn_out = (
rnn_out[0] if isinstance(rnn_out, tuple) else rnn_out
)
rnn_out = self._dropout(rnn_out) # [1, B, C]
rnn_out = paddle.squeeze(rnn_out, axis=0) # [B, C]
# Skip-RNN
skip_out = cnn_out[:, -self._conv_skip * self._skip_size:, :] # [B, T, C]
skip_out = paddle.reshape( # [B, conv_out // skip, skip, C]
skip_out,
shape=[-1, self._conv_skip, self._skip_size, self._channels]
)
skip_out = paddle.transpose(skip_out, perm=[0, 2, 1, 3]) # [B, skip, conv_out // skip, C]
skip_out = paddle.reshape(skip_out, shape=[-1, self._conv_skip, self._channels]) # [B, conv_out // skip, C]
_, skip_out = self._skip_rnn(skip_out) # [1, B, C]
skip_out = (
skip_out[0] if isinstance(skip_out, tuple) else skip_out
)
skip_out = paddle.reshape(skip_out, shape=[-1, self._skip_size * self._skip_rnn_num_cells])
skip_out = self._dropout(skip_out)
res = self._fc(
paddle.concat([rnn_out, skip_out], axis=1)
)
res = paddle.unsqueeze(res, axis=1)
# Highway
ar_in = X[PAST_TARGET][:, -self._in_chunk_len:, :]
ar_in = paddle.transpose(ar_in, perm=[0, 2, 1])
ar_out = self._ar_fc(ar_in) # [B, C, T]
ar_out = paddle.transpose(ar_out, perm=[0, 2, 1]) # [B, T, C]
out = ar_out + res
if self._output_activation:
out = (
F.sigmoid(out) if self._output_activation == "sigmoid" else F.tanh(out)
)
return out
[docs]class LSTNetRegressor(PaddleBaseModelImpl):
"""LSTNet\[1\] is a time series forecasting model introduced in 2018. LSTNet uses the
Convolution Neural Network (CNN) and the Recurrent Neural Network (RNN) to extract short-term local
dependency patterns among variables and to discover long-term patterns for time series trends.
\[1\] Lai G, et al. "Modeling Long- and Short-Term Temporal Patterns with Deep Neural Networks", `<https://arxiv.org/abs/1703.07015>`_
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default it will NOT skip any time steps.
sampling_stride(int): Sampling intervals between two adjacent samples.
loss_fn(Callable[..., paddle.Tensor]|None): Loss function.
optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
optimizer_params(Dict[str, Any]): Optimizer parameters.
eval_metrics(List[str]): Evaluation metrics of model.
callbacks(List[Callback]): Customized callback functions.
batch_size(int): Number of samples per batch.
max_epochs(int): Max epochs during training.
verbose(int): Verbosity mode.
patience(int): Number of epochs to wait for improvement before terminating.
seed(int|None): Global random seed.
skip_size(int): Skip size for the skip RNN layer.
channels(int): Number of channels for first layer Conv1D.
kernel_size(int): Kernel size for first layer Conv1D.
rnn_cell_type(str): Type of the RNN cell, Either GRU or LSTM.
rnn_num_cells(int): Number of RNN cells for each layer.
skip_rnn_cell_type(str): Type of the RNN cell for the skip layer, Either GRU or LSTM.
skip_rnn_num_cells(int): Number of RNN cells for each layer for skip part.
dropout_rate(float): Dropout regularization parameter.
output_activation(str|None): The last activation to be used for output.
Accepts either None (default no activation), sigmoid or tanh.
Attributes:
_in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
_out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
_skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default it will NOT skip any time steps.
_sampling_stride(int): Sampling intervals between two adjacent samples.
_loss_fn(Callable[..., paddle.Tensor]): Loss function.
_optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
_optimizer_params(Dict[str, Any]): Optimizer parameters.
_eval_metrics(List[str]): Evaluation metrics of model.
_callbacks(List[Callback]): Customized callback functions.
_batch_size(int): Number of samples per batch.
_max_epochs(int): Max epochs during training.
_verbose(int): Verbosity mode.
_patience(int): Number of epochs to wait for improvement before terminating.
_seed(int|None): Global random seed.
_stop_training(bool) Training status.
_skip_size(int): Skip size for the skip RNN layer.
_channels(int): Number of channels for first layer Conv1D.
_kernel_size(int): Kernel size for first layer Conv1D.
_rnn_cell_type(str): Type of the RNN cell, Either GRU or LSTM.
_rnn_num_cells(int): Number of RNN cells for each layer.
_skip_rnn_cell_type(str): Type of the RNN cell for the skip layer, Either GRU or LSTM.
_skip_rnn_num_cells(int): Number of RNN cells for each layer for skip part.
_dropout_rate(float): Dropout regularization parameter.
_output_activation(str|None): The last activation to be used for output.
Accepts either None (default no activation), sigmoid or tanh.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
skip_chunk_len: int = 0,
sampling_stride: int = 1,
loss_fn: Callable[..., paddle.Tensor] = F.mse_loss,
optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam,
optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3),
eval_metrics: List[str] = [],
callbacks: List[Callback] = [],
batch_size: int = 32,
max_epochs: int = 100,
verbose: int = 1,
patience: int = 10,
seed: Optional[int] = None,
skip_size: int = 1,
channels: int = 1,
kernel_size: int = 3,
rnn_cell_type: str = "GRU",
rnn_num_cells: int = 10,
skip_rnn_cell_type: str = "GRU",
skip_rnn_num_cells: int = 10,
dropout_rate: float = 0.2,
output_activation: Optional[str] = None
):
self._skip_size = skip_size
self._channels = channels
self._kernel_size = kernel_size
self._rnn_cell_type = rnn_cell_type
self._rnn_num_cells = rnn_num_cells
self._skip_rnn_cell_type = skip_rnn_cell_type
self._skip_rnn_num_cells = skip_rnn_num_cells
self._dropout_rate = dropout_rate
self._output_activation = output_activation
super(LSTNetRegressor, self).__init__(
in_chunk_len=in_chunk_len,
out_chunk_len=out_chunk_len,
skip_chunk_len=skip_chunk_len,
sampling_stride=sampling_stride,
loss_fn=loss_fn,
optimizer_fn=optimizer_fn,
optimizer_params=optimizer_params,
eval_metrics=eval_metrics,
callbacks=callbacks,
batch_size=batch_size,
max_epochs=max_epochs,
verbose=verbose,
patience=patience,
seed=seed,
)
def _check_tsdataset(
self,
tsdataset: TSDataset
):
"""Ensure the robustness of input data (consistent feature order), at the same time,
check whether the data types are compatible. If not, the processing logic is as follows:
1> Integer: Convert to np.int64.
2> Floating: Convert to np.float32.
3> Missing value: Warning.
4> Other: Illegal.
Args:
tsdataset(TSDataset): Data to be checked.
"""
target_columns = tsdataset.get_target().dtypes.keys()
for column, dtype in tsdataset.dtypes.items():
if column in target_columns:
raise_if_not(
np.issubdtype(dtype, np.floating),
f"lstnet's target dtype only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
continue
raise_if_not(
np.issubdtype(dtype, np.floating),
f"lstnet's cov(observed or known) dtype currently only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
super(LSTNetRegressor, self)._check_tsdataset(tsdataset)
def _update_fit_params(
self,
train_tsdataset: List[TSDataset],
valid_tsdataset: Optional[List[TSDataset]] = None
) -> Dict[str, Any]:
"""Infer parameters by TSdataset automatically.
Args:
train_tsdataset(List[TSDataset]): list of train dataset.
valid_tsdataset(List[TSDataset]|None): list of validation dataset.
Returns:
Dict[str, Any]: model parameters.
"""
target_dim = train_tsdataset[0].get_target().data.shape[1]
fit_params = {
"target_dim": target_dim
}
return fit_params
def _init_network(self) -> paddle.nn.Layer:
"""Setup the network.
Returns:
paddle.nn.Layer.
"""
return _LSTNetModule(
in_chunk_len=self._in_chunk_len,
out_chunk_len=self._out_chunk_len,
target_dim=self._fit_params["target_dim"],
skip_size=self._skip_size,
channels=self._channels,
kernel_size=self._kernel_size,
rnn_cell_type=self._rnn_cell_type,
rnn_num_cells=self._rnn_num_cells,
skip_rnn_cell_type=self._skip_rnn_cell_type,
skip_rnn_num_cells=self._skip_rnn_num_cells,
dropout_rate=self._dropout_rate,
output_activation=self._output_activation
)