Source code for paddlets.models.forecasting.dl.informer
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from typing import List, Dict, Any, Callable, Optional, Tuple
from paddle.optimizer import Optimizer
import paddle.nn.functional as F
import numpy as np
import paddle
from paddlets.models.forecasting.dl._informer import MixedEmbedding, Informer
from paddlets.models.forecasting.dl.paddle_base_impl import PaddleBaseModelImpl
from paddlets.models.common.callbacks import Callback
from paddlets.logger import raise_if_not
from paddlets.datasets import TSDataset
PAST_TARGET = "past_target"
class _InformerModule(paddle.nn.Layer):
"""Paddle layer implementing informer module.
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
start_token_len(int): The start token size of the forecasting horizon.
target_dim(int): The numer of targets.
d_model(int): The expected feature size for the input/output of informer's encoder/decoder.
nhead(int): The number of heads in the multi-head attention mechanism.
ffn_channels(int): The Number of channels for Conv1D of FFN layer.
num_encoder_layers(int): The number of encoder layers in the encoder.
num_decoder_layers(int): The number of decoder layers in the decoder.
activation(str): The activation function of encoder/decoder intermediate layer,
["relu", "gelu"] is optional.
dropout_rate(float): Fraction of neurons affected by Dropout.
Attributes:
_out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
_start_token_len(int): The start token size of the forecasting horizon.
_target_dim(int): The numer of targets.
_src_embedding(paddle.nn.Layer): A data(position + token) embedding.
_tgt_embedding(paddle.nn.Layer): A data(position + token) embedding.
_informer(paddle.nn.Layer): A Informer model composed of an instance of `InformerEncoder`
and an instance `InformerDecoder`
_out_proj(paddle.nn.Layer): The projection layer.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
start_token_len: int,
target_dim: int,
d_model: int,
nhead: int,
ffn_channels: int,
num_encoder_layers: int,
num_decoder_layers: int,
activation: str,
dropout_rate: float,
):
super(_InformerModule, self).__init__()
self._in_chunk_len = in_chunk_len
self._out_chunk_len = out_chunk_len
self._start_token_len = start_token_len
self._target_dim = target_dim
raise_if_not(
in_chunk_len >= start_token_len,
f"`in_chunk_len` must be greater than or equal to `start_token_len`\n" \
f"Choose a smaller `start_token_len` or bigger `in_chunk_len`."
)
# Encoding step.
# 1> Adding relative position/timfeat/token information to the input sequence.
self._src_embedding = MixedEmbedding(target_dim, d_model, in_chunk_len, dropout_rate)
self._tgt_embedding = MixedEmbedding(target_dim, d_model, start_token_len + out_chunk_len, dropout_rate)
# Informer(interact features using prob_sparse_attention and cross_attention) step.
# 1> Interact src sequence features using prob_sparse_attention.
# 2> Interact tgt sequence features using prob_sparse_attention.
# 3> Interact encoded sequence with decoded sequence using cross_attention mechanism.
self._informer = Informer(
d_model=d_model,
nhead=nhead,
ffn_channels=ffn_channels,
num_encoder_layers=num_encoder_layers,
num_decoder_layers=num_decoder_layers,
activation=activation,
dropout_rate=dropout_rate,
)
# Projection step.
self._out_proj = paddle.nn.Linear(d_model, target_dim)
def _create_informer_inputs(
self,
X: Dict[str, paddle.Tensor]
) -> Tuple[paddle.Tensor, paddle.Tensor]:
"""`TSDataset` stores time series in the (batch_size, in_chunk_len, target_dim) format.
Take [X[batch_size, -out_chunk_len:, target_dim], paddle.zeros([batch_size, -out_chunk_len:, target_dim])]
as input to decoder.
Args:
X(Dict[str, paddle.Tensor]): Dict of feature tensor.
Returns:
Tuple[paddle.Tensor, paddle.Tensor]: The inputs for the encoder and decoder
"""
src = X[PAST_TARGET]
batch_size, _, d_model = src.shape
tgt = src[:, self._in_chunk_len - self._start_token_len:, :]
padding = paddle.zeros([batch_size, self._out_chunk_len, d_model])
tgt = paddle.concat([tgt, padding], axis=1)
return src, tgt
def forward(
self,
X: Dict[str, paddle.Tensor]
) -> paddle.Tensor:
"""Forward.
Args:
X(Dict[str, paddle.Tensor]): Dict of feature tensor.
Returns:
paddle.Tensor: Output of model.
"""
# Here we create `src` and `tgt`,
# the inputs for the encoder and decoder side of the informer architecture.
src, tgt = self._create_informer_inputs(X)
src = self._src_embedding(src)
tgt = self._tgt_embedding(tgt)
out = self._informer(src, tgt)
out = self._out_proj(out)
# Since the decoder output contains information of start token,
# we need to truncate the last out_chunk_len time step as the final prediction result.
out = out[:, -self._out_chunk_len:, :]
return out
[docs]class InformerModel(PaddleBaseModelImpl):
"""Informer\[1\] is a state-of-the-art deep learning model introduced in 2021.
It is an encoder-decoder architecture whose core feature is the `prob sparse attention` mechanism,
which achieves the O(LlogL) time complexity and O(LlogL) memory usage on dependency alignments.
\[1\] Zhou H, et al. "Informer: Beyond efficient transformer for long sequence time-series forecasting", `<https://arxiv.org/abs/2012.07436>`_
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
start_token_len(int): The start token size of the forecasting horizon.
skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default it will NOT skip any time steps.
sampling_stride(int): Sampling intervals between two adjacent samples.
loss_fn(Callable[..., paddle.Tensor]|None): Loss function.
optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
optimizer_params(Dict[str, Any]): Optimizer parameters.
eval_metrics(List[str]): Evaluation metrics of model.
callbacks(List[Callback]): Customized callback functions.
batch_size(int): Number of samples per batch.
max_epochs(int): Max epochs during training.
verbose(int): Verbosity mode.
patience(int): Number of epochs to wait for improvement before terminating.
seed(int|None): Global random seed.
d_model(int): The expected feature size for the input/output of the informer's encoder/decoder.
nhead(int): The number of heads in the multi-head attention mechanism.
ffn_channels(int): The Number of channels for Conv1D of FFN layer.
num_encoder_layers(int): The number of encoder layers in the encoder.
num_decoder_layers(int): The number of decoder layers in the decoder.
activation(str): The activation function of encoder/decoder intermediate layer,
["relu", "gelu"] is optional.
dropout_rate(float): Fraction of neurons affected by Dropout.
Attributes:
_in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
_out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
_start_token_len(int): The start token size of the forecasting horizon.
_skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default it will NOT skip any time steps.
_sampling_stride(int): Sampling intervals between two adjacent samples.
_loss_fn(Callable[..., paddle.Tensor]|None): Loss function.
_optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
_optimizer_params(Dict[str, Any]): Optimizer parameters.
_eval_metrics(List[str]): Evaluation metrics of model.
_callbacks(List[Callback]): Customized callback functions.
_batch_size(int): Number of samples per batch.
_max_epochs(int): Max epochs during training.
_verbose(int): Verbosity mode.
_patience(int): Number of epochs to wait for improvement before terminating.
_seed(int|None): Global random seed.
_stop_training(bool) Training status.
_d_model(int): The expected feature size for the input/output of the informer's encoder/decoder.
_nhead(int): The number of heads in the multi-head attention mechanism.
_num_encoder_layers(int): The number of encoder layers in the encoder.
_num_decoder_layers(int): The number of decoder layers in the decoder.
_activation(str): The activation function of encoder/decoder intermediate layer.
["relu", "gelu"] is optional.
_dropout_rate(float): Fraction of neurons affected by Dropout.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
start_token_len: int = 0,
skip_chunk_len: int = 0,
sampling_stride: int = 1,
loss_fn: Callable[..., paddle.Tensor] = F.mse_loss,
optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam,
optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3),
eval_metrics: List[str] = [],
callbacks: List[Callback] = [],
batch_size: int = 128,
max_epochs: int = 10,
verbose: int = 1,
patience: int = 4,
seed: Optional[int] = None,
d_model: int = 512,
nhead: int = 8,
ffn_channels: int = 2048,
num_encoder_layers: int = 2,
num_decoder_layers: int = 1,
activation: str = "relu",
dropout_rate: float = 0.1,
):
self._start_token_len = start_token_len
self._d_model = d_model
self._nhead = nhead
self._ffn_channels = ffn_channels
self._num_encoder_layers = num_encoder_layers
self._num_decoder_layers = num_decoder_layers
self._activation = activation
self._dropout_rate = dropout_rate
super(InformerModel, self).__init__(
in_chunk_len=in_chunk_len,
out_chunk_len=out_chunk_len,
skip_chunk_len=skip_chunk_len,
sampling_stride=sampling_stride,
loss_fn=loss_fn,
optimizer_fn=optimizer_fn,
optimizer_params=optimizer_params,
eval_metrics=eval_metrics,
callbacks=callbacks,
batch_size=batch_size,
max_epochs=max_epochs,
verbose=verbose,
patience=patience,
seed=seed,
)
def _check_tsdataset(
self,
tsdataset: TSDataset
):
"""Ensure the robustness of input data (consistent feature order), at the same time,
check whether the data types are compatible. If not, the processing logic is as follows:
1> Integer: Convert to np.int64.
2> Floating: Convert to np.float32.
3> Missing value: Warning.
4> Other: Illegal.
Args:
tsdataset(TSDataset): Data to be checked.
"""
target_columns = tsdataset.get_target().dtypes.keys()
for column, dtype in tsdataset.dtypes.items():
if column in target_columns:
raise_if_not(
np.issubdtype(dtype, np.floating),
f"informer's target dtype only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
continue
raise_if_not(
np.issubdtype(dtype, np.floating),
f"informer's cov(observed or known) dtype currently only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
super(InformerModel, self)._check_tsdataset(tsdataset)
def _update_fit_params(
self,
train_tsdataset: List[TSDataset],
valid_tsdataset: Optional[List[TSDataset]] = None
) -> Dict[str, Any]:
"""Infer parameters by TSdataset automatically.
Args:
train_tsdataset(List[TSDataset]): list of train dataset.
valid_tsdataset(List[TSDataset]|None): list of validation dataset.
Returns:
Dict[str, Any]: model parameters.
"""
target_dim = train_tsdataset[0].get_target().data.shape[1]
fit_params = {
"target_dim": target_dim
}
return fit_params
def _init_network(self) -> paddle.nn.Layer:
"""Setup the network.
Returns:
paddle.nn.Layer
"""
return _InformerModule(
in_chunk_len=self._in_chunk_len,
out_chunk_len=self._out_chunk_len,
start_token_len=self._start_token_len,
target_dim=self._fit_params["target_dim"],
d_model=self._d_model,
nhead=self._nhead,
ffn_channels=self._ffn_channels,
num_encoder_layers=self._num_encoder_layers,
num_decoder_layers=self._num_decoder_layers,
activation=self._activation,
dropout_rate=self._dropout_rate,
)