#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from collections import OrderedDict
from typing import Optional, Dict, Any, Callable, List, Tuple
import paddle
import numpy as np
from paddlets import TSDataset
from paddlets.logger import Logger, raise_if, raise_if_not, raise_log
from paddlets.models.forecasting.dl.paddle_base_impl import PaddleBaseModelImpl
from paddlets.metrics.base import Metric
from paddlets.metrics.metrics import MetricContainer
from paddlets.models.common.callbacks import Callback
logger = Logger(__name__)
class _Splitter(paddle.nn.Layer):
"""
Time series split module, split raw sequence to even and odd sub-sequences.
"""
def __init__(self):
super(_Splitter, self).__init__()
def _even(self, x: paddle.Tensor) -> paddle.Tensor:
"""
Down-sample and get the sub-sequence from input sequence x, where each element in sub-sequence is sampled
from input x's even index.
Args:
x(paddle.Tensor): Input Sequence to be down-sampled.
Returns:
paddle.Tensor: down-sampled sub-sequence.
"""
return x[:, ::2, :]
def _odd(self, x: paddle.Tensor):
"""
Down-sample and get the sub-sequence from input sequence x, where each element in sub-sequence is sampled
from input x's odd index.
Args:
x(paddle.Tensor): Input Sequence to be down-sampled.
Returns:
paddle.Tensor: down-sampled sub-sequence.
"""
return x[:, 1::2, :]
def forward(self, x: paddle.Tensor) -> Tuple[paddle.Tensor, paddle.Tensor]:
"""
Down-sample and build the even / odd sub-sequences from input sequence.
Args:
x(paddle.Tensor): input sequence to be down-sampled.
Returns:
Tuple[paddle.Tensor, paddle.Tensor]: A two-element tuple, where 1-st element is even sub-sequence, 2-nd
element is odd sub-sequence.
"""
return self._even(x), self._odd(x)
class _Interactor(paddle.nn.Layer):
"""
Interactive Learning module, perform convolutional and interactive process on even / odd sub-sequences.
Args:
in_planes(int): The size of the loopback window, i.e., the number of time steps feed to the model.
kernel_size(int): kernel size for Conv1D layer.
dropout_rate(float): dropout regularization parameter.
num_group(int): group number for Conv1D layer groups parameter.
hidden_size(int): The number of features in hidden state.
"""
def __init__(
self,
in_planes: int,
kernel_size: int,
dropout_rate: float,
num_group: int,
hidden_size: int
):
super(_Interactor, self).__init__()
self._in_planes = in_planes
self._kernel_size = kernel_size
self._dropout_rate = dropout_rate
self._hidden_size = hidden_size
self._num_group = num_group
self._dilation = 1
self._split_layer = _Splitter()
# psi(ψ) / phi(Φ) / eta(η) / rho(ρ)
self._psi = self._build_single_internal_module()
self._phi = self._build_single_internal_module()
self._eta = self._build_single_internal_module()
self._rho = self._build_single_internal_module()
def forward(self, x: paddle.Tensor) -> Tuple[paddle.Tensor, paddle.Tensor]:
"""
Perform convolutional and interactive process on even / odd sub-sequences, return processed sub-sequences.
Args:
x(paddle.Tensor): Input sequence to be forward processed.
Returns(Tuple[paddle.Tensor, paddle.Tensor]): A 2-element tuple, where 1-st element is processed even
sub-sequence, 2-nd element is processed odd sub-sequence.
"""
# (Batch_size, in_chunk_len, in_dim)
x_even, x_odd = self._split_layer(x)
# (Batch_size, in_dim, in_chunk_len)
x_even = paddle.transpose(x_even, perm=[0, 2, 1])
x_odd = paddle.transpose(x_odd, perm=[0, 2, 1])
x_scaled_even = paddle.multiply(x=x_even, y=paddle.exp(self._phi(x_odd)))
x_scaled_odd = paddle.multiply(x=x_odd, y=paddle.exp(self._psi(x_even)))
x_even_update = x_scaled_even + self._eta(x_scaled_odd)
x_odd_update = x_scaled_odd - self._rho(x_scaled_even)
# Return shape: (Batch_size, in_chunk_len, in_dim), (Batch_size, in_chunk_len, in_dim)
return paddle.transpose(x_even_update, perm=[0, 2, 1]), paddle.transpose(x_odd_update, perm=[0, 2, 1])
def _build_single_internal_module(self) -> paddle.nn.Sequential:
"""
Build an internal forward process module for current SCINet's interactive learning module. Each interactive
learning module contains 4 such internal sequential modules. Each of this internal sequential module
contains 5 submodules. The 1-st replication padding submodule is to keep the border shrunk caused by the
convolution operation. The 2-nd 1-D convolutional layer with self.kernel_size is to extend the input Channel
C to self.hidden_size * C. Followed by the 3-rd LeakyReLU and 4-th Dropout layer, the 5-th 1-D convolutional
layer is to recover the channel self.hidden_size * C to C.
Returns:
paddle.nn.Sequential: Built internal module for current SCINet's interactive learning module.
"""
if self._kernel_size % 2 == 0:
# by default: stride = 1
pad_l = self._dilation * (self._kernel_size - 2) // 2 + 1
pad_r = self._dilation * self._kernel_size // 2 + 1
else:
# we fix the kernel size of the second layer as 3.
pad_l = self._dilation * (self._kernel_size - 1) // 2 + 1
pad_r = self._dilation * (self._kernel_size - 1) // 2 + 1
prev_size = 1
layers = [
paddle.nn.Pad1D((pad_l, pad_r)),
paddle.nn.Conv1D(
in_channels=self._in_planes * prev_size,
out_channels=self._in_planes * self._hidden_size,
kernel_size=self._kernel_size,
dilation=self._dilation,
stride=1,
groups=self._num_group
),
paddle.nn.LeakyReLU(negative_slope=0.01),
paddle.nn.Dropout(p=self._dropout_rate),
paddle.nn.Conv1D(
in_channels=self._in_planes * self._hidden_size,
out_channels=self._in_planes,
kernel_size=3,
stride=1,
groups=self._num_group
),
paddle.nn.Tanh()
]
return paddle.nn.Sequential(*layers)
class _SCINetTree(paddle.nn.Layer):
"""
SCINet encode binary tree.
Args:
in_planes(int): The size of the loopback window, i.e., the number of time steps feed to the model.
current_level(int): tree level for current child tree. 0 means leaf node, max_tree_level means root node.
kernel_size(int): kernel size for Conv1D layer.
dropout_rate(float): dropout regularization parameter.
num_group(int): group number for Conv1D layer groups parameter.
hidden_size(int): The number of features in hidden state.
"""
def __init__(
self,
in_planes: int,
current_level: int,
kernel_size: int,
dropout_rate: float,
num_group: int,
hidden_size: int
):
super(_SCINetTree, self).__init__()
self._current_level = current_level
self._workingblock = _Interactor(
in_planes=in_planes,
kernel_size=kernel_size,
dropout_rate=dropout_rate,
num_group=num_group,
hidden_size=hidden_size
)
if self._current_level != 0:
self._scinet_tree_even = _SCINetTree(
in_planes=in_planes,
current_level=current_level - 1,
kernel_size=kernel_size,
dropout_rate=dropout_rate,
num_group=num_group,
hidden_size=hidden_size
)
self._scinet_tree_odd = _SCINetTree(
in_planes=in_planes,
current_level=current_level - 1,
kernel_size=kernel_size,
dropout_rate=dropout_rate,
num_group=num_group,
hidden_size=hidden_size
)
def _concat_and_realign(self, even, odd) -> paddle.Tensor:
"""
Internal method, concatenate input even and odd sub-sequences and realign the timesteps to original order.
Args:
even(paddle.Tensor): Input even sub-sequence to be concatenated, e.g., [0, 2, 4, 6]
odd(paddle.Tensor): Input odd sub-sequence to be concatenated, e.g., [1, 3, 5]
Returns:
paddle.Tensor: concatenated new sequence with timestep order realigned, e.g., [0, 1, 2, 3, 4, 5, 6].
"""
# T_seq_len, Batch, Dim
even = paddle.transpose(even, perm=[1, 0, 2])
odd = paddle.transpose(odd, perm=[1, 0, 2])
even_len = even.shape[0]
odd_len = odd.shape[0]
min_len = min((odd_len, even_len))
all_time_steps = []
for i in range(min_len):
all_time_steps.append(even[i].unsqueeze(0))
all_time_steps.append(odd[i].unsqueeze(0))
if odd_len < even_len:
# Given:
# full seq = [0, 1, 2, 3, 4]
# Thus:
# even = [0, 2, 4]
# odd = [1, 3]
# thus odd_len (2) < even_len (3)
# Note that even_len < odd_len will NEVER occur, because index always start with an even number (zero).
all_time_steps.append(even[-1].unsqueeze(0))
concat_all_time_steps = paddle.concat(all_time_steps, axis=0)
# Batch, T_seq_len, Dim
return paddle.transpose(concat_all_time_steps, perm=[1, 0, 2])
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
"""
Forward compute function for SCINet encode tree network.
Args:
x(paddle.Tensor): input tensor to be forward computed.
Returns:
paddle.Tensor: forward processed tensor output.
"""
x_even_update, x_odd_update = self._workingblock(x)
# reorder odd/even sub-sequences recursively.
if self._current_level == 0:
# leaf nodes.
return self._concat_and_realign(x_even_update, x_odd_update)
else:
# non-leaf nodes, call zip_up recursively until reach leaf nodes.
return self._concat_and_realign(self._scinet_tree_even(x_even_update), self._scinet_tree_odd(x_odd_update))
class _StackedSCINetModule(paddle.nn.Layer):
"""
Stacked SCINet, contains one or more SCINet(s).
Args:
in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
in_dim(int): input feature dimensions, contain both target dim + (possibly be None) known cov dim +
(possibly be None) observed cov dim.
num_stack(int): stack number in Stacked SCINet.
num_level(int): scinet tree level.
num_decoder_layer(int): decoder layer number.
concat_len(int): length to concat per stack.
kernel_size(int): kernel size for Conv1D layer.
dropout_rate(float): dropout regularization parameter.
num_group(int): group number for Conv1D layer groups parameter.
hidden_size(int): The number of features in hidden state.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
in_dim: int,
num_stack: int = 1,
num_level: int = 3,
num_decoder_layer: int = 1,
concat_len: int = 0,
kernel_size: int = 5,
dropout_rate: float = 0.5,
num_group: int = 1,
hidden_size: int = 1
):
super(_StackedSCINetModule, self).__init__()
self._in_chunk_len = in_chunk_len
self._out_chunk_len = out_chunk_len
self._in_dim = in_dim
self._hidden_size = hidden_size
self._num_stack = num_stack
self._num_level = num_level
self._num_decoder_layer = num_decoder_layer
self._concat_len = concat_len
self._num_group = num_group
self._kernel_size = kernel_size
self._dropout_rate = dropout_rate
div_num = 6
self._overlap_len = self._in_chunk_len // 4
self._div_len = self._in_chunk_len // div_num
self._encoder1 = _SCINetTree(
in_planes=self._in_dim,
current_level=self._num_level - 1,
kernel_size=self._kernel_size,
dropout_rate=self._dropout_rate,
num_group=self._num_group,
hidden_size=self._hidden_size
)
# only implement two stacks at most.
if self._num_stack == 2:
self._encoder2 = _SCINetTree(
in_planes=self._in_dim,
current_level=self._num_level - 1,
kernel_size=self._kernel_size,
dropout_rate=self._dropout_rate,
num_group=self._num_group,
hidden_size=self._hidden_size
)
self._decoder1 = paddle.nn.Conv1D(
in_channels=self._in_chunk_len,
out_channels=self._out_chunk_len,
kernel_size=1,
stride=1
)
self._div_projection = paddle.nn.LayerList()
if self._num_decoder_layer > 1:
self._decoder1 = paddle.nn.Linear(self._in_chunk_len, self._out_chunk_len)
for layer_idx in range(self._num_decoder_layer - 1):
div_projection = paddle.nn.LayerList()
for i in range(div_num):
lens = min(i * self._div_len + self._overlap_len, self._in_chunk_len) - i * self._div_len
div_projection.append(paddle.nn.Linear(lens, self._div_len))
self._div_projection.append(div_projection)
if self._num_stack == 2:
if self._concat_len > 0:
in_channels = self._concat_len + self._out_chunk_len
else:
in_channels = self._in_chunk_len + self._out_chunk_len
self._decoder2 = paddle.nn.Conv1D(
in_channels=in_channels,
out_channels=self._out_chunk_len,
kernel_size=1
)
def forward(self, x_dict: Dict[str, paddle.Tensor]) -> Tuple[paddle.Tensor, Optional[paddle.Tensor]]:
"""
Stacked SCINet network.
Args:
x_dict(Dict[str, paddle.Tensor]): key-value formatted samples, built by data_adapter.
Returns:
Tuple[paddle.Tensor, Optional[paddle.Tensor]]: forward processed output with two-element tuple shape, where
1-st element is the final output from the last stack, the 2-nd element is the intermediate output from
previous stack.
"""
# [target_features, observed_features, past_known_features]
x = x_dict["past_target"]
if "observed_cov_numeric" in x_dict.keys():
x = paddle.concat(x=(x, x_dict["observed_cov_numeric"]), axis=2)
if "known_cov_numeric" in x_dict.keys():
# x_dict["known_cov_numeric"].shape = (batch_size, in_chunk_len + out_chunk_len, known_dim)
# past_known.shape = (batch_size, in_chunk_len, known_dim)
past_known = x_dict["known_cov_numeric"][:, :self._in_chunk_len, ]
x = paddle.concat(x=(x, past_known), axis=2)
# the first stack
res1 = x
x = self._encoder1(x)
x += res1
if self._num_decoder_layer == 1:
# (B, in_chunk_len, D) -> decoder -> (B, out_chunk_len, D)
x = self._decoder1(x)
else:
x = paddle.transpose(x, perm=[0, 2, 1])
for div_projection in self._div_projection:
output = paddle.zeros(x.shape, dtype=x.dtype)
for i, div_layer in enumerate(div_projection):
div_x = x[:, :, i * self._div_len:min(i * self._div_len + self._overlap_len, self._in_chunk_len)]
output[:, :, i * self._div_len:(i + 1) * self._div_len] = div_layer(div_x)
x = output
x = self._decoder1(x)
x = paddle.transpose(x, perm=[0, 2, 1])
if self._num_stack == 1:
return x, None
# self._num_stack == 2
mid_output = x
if self._concat_len > 0:
x = paddle.concat(x=(res1[:, -self._concat_len:, :], x), axis=1)
else:
# (B, out_chunk_len, D) -> concat -> (B, in_chunk_len + out_chunk_len, D)
x = paddle.concat(x=(res1, x), axis=1)
# the second stack
res2 = x
x = self._encoder2(x)
x += res2
# (B, in_chunk_len + out_chunk_len, D) -> decoder2 -> (B, out_chunk_len, D)
x = self._decoder2(x)
return x, mid_output
[docs]class SCINetModel(PaddleBaseModelImpl):
"""
DownSampled Convolutional Interactive Network (SCINet) for time series forcasting.
Refers to `SCINet <https://arxiv.org/pdf/2106.09305.pdf>`_ .
Args:
in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
out_chunk_len(int): The size of the forecasting horizon, i.e. the number of time steps output by the model.
skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample.
The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample.
By default, it will NOT skip any time steps.
sampling_stride(int): Time steps to stride over the i-th sample and (i+1)-th sample. More precisely,
let `t` be the time index of target time series, `t[i]` be the start time of the i-th sample,
`t[i+1]` be the start time of the (i+1)-th sample, thus `sampling_stride` is equal to `t[i+1] - t[i]`.
loss_fn(Callable[..., paddle.Tensor]|None): Loss function.
optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
optimizer_params(Dict[str, Any], optional): Optimizer parameters.
eval_metrics(List[str]|List[Metric], optional): Evaluation metrics of model.
callbacks(List[Callback], optional): Customized callback functions.
batch_size(int): Number of samples per batch.
max_epochs(int): Max training epochs.
verbose(int): Verbosity mode.
patience(int): Number of epochs to wait for improvement before terminating.
seed(int|None): Global random seed.
num_stack(int): stack number in Stacked SCINet.
num_level(int): scinet tree level.
num_decoder_layer(int): decoder layer number.
concat_len(int): length to concat per stack.
kernel_size(int): kernel size for Conv1D layer.
dropout_rate(float): dropout regularization parameter.
num_group(int): group number for Conv1D layer groups parameter.
hidden_size(int): The number of features in hidden state for SCINet Interactor module.
"""
def __init__(
self,
in_chunk_len: int,
out_chunk_len: int,
skip_chunk_len: int = 0,
sampling_stride: int = 1,
loss_fn: Callable[..., paddle.Tensor] = paddle.nn.functional.mse_loss,
optimizer_fn: Callable[..., paddle.optimizer.Optimizer] = paddle.optimizer.Adam,
optimizer_params: Optional[Dict[str, Any]] = None,
eval_metrics: Optional[List[str]] = None,
callbacks: Optional[List[Callback]] = None,
batch_size: int = 8,
max_epochs: int = 100,
verbose: int = 1,
patience: int = 10,
seed: Optional[int] = None,
num_stack: int = 1,
num_level: int = 3,
num_decoder_layer: int = 1,
concat_len: int = 0,
kernel_size: int = 5,
dropout_rate: float = 0.5,
num_group: int = 1,
hidden_size: int = 1
):
super(SCINetModel, self).__init__(
in_chunk_len=in_chunk_len,
out_chunk_len=out_chunk_len,
skip_chunk_len=skip_chunk_len,
sampling_stride=sampling_stride,
loss_fn=loss_fn,
optimizer_fn=optimizer_fn,
optimizer_params=optimizer_params,
eval_metrics=eval_metrics,
callbacks=callbacks,
batch_size=batch_size,
max_epochs=max_epochs,
verbose=verbose,
patience=patience,
seed=seed
)
if self._optimizer_params is None:
self._optimizer_params = dict(learning_rate=1e-3)
if self._eval_metrics is None:
self._eval_metrics = list()
if self._callbacks is None:
self._callbacks = list()
self._hidden_size = hidden_size
self._num_stack = num_stack
self._num_level = num_level
self._num_decoder_layer = num_decoder_layer
self._concat_len = concat_len
self._num_group = num_group
self._kernel_size = kernel_size
self._dropout_rate = dropout_rate
raise_if_not(0 < num_stack <= 2, f"The number of stack can only be 1 or 2, got {num_stack}.")
# limit the recursion depth.
raise_if(
self._in_chunk_len % (np.power(2, self._num_level)) != 0,
f"in_chunk_len % 2**num_level must != 0. Actual " +
f"in_chunk_len = {self._in_chunk_len}, " +
f"num_level = {self._num_level}, " +
f"in_chunk_len % 2**num_level = ({self._in_chunk_len % (np.power(2, self._num_level))})."
)
def _check_tsdataset(self, tsdataset: TSDataset) -> None:
"""
SCINet only allows float32 type variables, any int-like variables are not allowed.
Args:
tsdataset(TSDataset): input TSDataset to be checked.
"""
for column, dtype in tsdataset.dtypes.items():
raise_if_not(
np.issubdtype(dtype, np.floating),
f"scinet variables' dtype only supports [float16, float32, float64], "
f"but received {column}: {dtype}."
)
super(SCINetModel, self)._check_tsdataset(tsdataset)
def _update_fit_params(
self,
train_tsdataset: List[TSDataset],
valid_tsdataset: Optional[List[TSDataset]] = None
) -> Dict[str, Any]:
"""
Build fit parameters.
Args:
train_tsdataset(List[TSDataset]): list of train dataset.
valid_tsdataset(List[TSDataset]|None): list of validation dataset.
Returns:
Dict[str, Any]: model parameters
"""
tsdataset = train_tsdataset[0]
target_dim = tsdataset.target.data.shape[1]
# observed_num_dim = observed cov numeric dimension
observed_num_dim = 0
if tsdataset.observed_cov is not None:
observed_num_dim = tsdataset.observed_cov.data.shape[1]
# known_num_dim = known cov numeric dimension
known_num_dim = 0
if tsdataset.known_cov is not None:
known_num_dim = tsdataset.known_cov.data.shape[1]
# Because self._check_tsdataset already guarantee that only numeric features are supported, so
# here the below dims only contain numeric dim, but not contain categorical dims.
fit_params = {
"target_dim": target_dim,
"observed_num_dim": observed_num_dim,
"known_num_dim": known_num_dim
}
return fit_params
def _init_network(self) -> paddle.nn.Layer:
"""
Initialize the SCINet network.
Returns:
paddle.nn.Layer: Initialized network.
"""
in_dim = self._fit_params["target_dim"] + \
self._fit_params["observed_num_dim"] + \
self._fit_params["known_num_dim"]
return _StackedSCINetModule(
in_chunk_len=self._in_chunk_len,
out_chunk_len=self._out_chunk_len,
in_dim=in_dim,
num_stack=self._num_stack,
num_level=self._num_level,
num_decoder_layer=self._num_decoder_layer,
concat_len=self._concat_len,
kernel_size=self._kernel_size,
dropout_rate=self._dropout_rate,
num_group=self._num_group,
hidden_size=self._hidden_size
)
def _init_metrics(self, eval_names: List[str]) -> Tuple[List[Metric], List[str], Dict[str, MetricContainer]]:
"""
Set attributes relative to the metrics.
Args:
eval_names(List[str]): List of eval set names.
Returns:
List[Metric]: List of metric instance.
List[str]: List of metric names.
Dict[str, MetricContainer]: Dict of metric container.
"""
metrics = self._eval_metrics
metric_container_dict = OrderedDict()
for name in eval_names:
metric_container_dict.update({name: MetricContainer(metrics, prefix=f"{name}_")})
if self._num_stack == 2:
# add metric container for SCINet mid predict output.
metric_container_dict.update({f"{name}_mid": MetricContainer(metrics, prefix=f"{name}_mid_")})
metrics, metrics_names = [], []
for _, metric_container in metric_container_dict.items():
metrics.extend(metric_container._metrics)
metrics_names.extend(metric_container._names)
return metrics, metrics_names, metric_container_dict
def _compute_loss(
self,
y_score: Tuple[paddle.Tensor, Optional[paddle.Tensor]],
y_true: paddle.Tensor
) -> paddle.Tensor:
"""
Internal method, compute loss for current batch.
Args:
y_score(paddle.Tensor): predicted y for current batch.
y_true(paddle.Tensor): ground truth for current batch.
Returns:
paddle.Tensor: computed loss for current batch.
"""
# y_score = tuple, where tuple[0] is final pred res, tuple[1] is intermediate pred res from previous stack.
y_pred, mid_pred = y_score
loss = self._loss_fn(y_pred[:, :, :self._fit_params["target_dim"]], y_true)
# pre-check already guarantee that num_stack can only be either 1 or 2.
if self._num_stack == 1:
# not contain mid output
return loss
if self._num_stack == 2:
# contain mid output
mid_loss = self._loss_fn(mid_pred[:, :, :self._fit_params["target_dim"]], y_true)
return loss + mid_loss
raise_log(exception=ValueError(f"num_stack ({self._num_stack}) must be either 1 or 2."), logger=logger)
def _predict(self, dataloader: paddle.io.DataLoader) -> np.ndarray:
"""
Predict function core logic.
SCINet will return a tuple of tensors = (pred, mid_pred), where mid_pred is the intermediate output
from previous stack, the mid_pred is used for computing loss, so here only need to append tuple[0] and discard
tuple[1].
Args:
dataloader(paddle.io.DataLoader): Data to be predicted.
Returns:
np.ndarray: predicted ndarray matrix.
"""
self._network.eval()
results = []
for batch_nb, data in enumerate(dataloader):
x, _ = self._prepare_X_y(data)
# output.shape = (out_chunk_len, in_dim)
output, _ = self._network(x)
predictions = output.numpy()
results.append(predictions)
# results.shape = (batch_size, out_chunk_len, in_dim)
results = np.vstack(results)
# Note: the pre-logic already guarantee that the feature layout is as follows (most left is target, middle is
# observed cov, most right is known cov):
# x = [target_features, observed_features, known_features]
# results = [target_pred_res, observed_pred_res, known_pred_res]
# As we only predict target, but not predict co-variates, so here we only cut and return target predict res.
return results[:, :, :self._fit_params["target_dim"]]
def _predict_epoch(self, name: str, loader: paddle.io.DataLoader) -> None:
"""
Predict an epoch and update metrics.
Args:
name(str): Name of the validation set.
loader(paddle.io.DataLoader): DataLoader with validation set.
"""
self._network.eval()
y_true_list = []
y_pred_list = []
mid_pred_list = []
for batch_idx, data in enumerate(loader):
# y_true_batch.shape = (batch_size, out_chunk_len, target_dim)
X, y_true_batch = self._prepare_X_y(data)
# y_pred_batch.shape = (batch_size, out_chunk_len, in_dim)
# mid_pred_batch.shape = (batch_size, out_chunk_len, in_dim)
y_pred_batch, mid_pred_batch = self._predict_batch(X)
y_true_list.append(y_true_batch)
y_pred_list.append(y_pred_batch)
mid_pred_list.append(mid_pred_batch)
y_true = np.vstack(y_true_list)
# y_true.shape[2] = target_dim + observed_cov_col_num
# y_pred.shape[2] mid_pred.shape[2] = target_dim
y_pred = np.vstack(y_pred_list)[:, :, :self._fit_params["target_dim"]]
y_pred_metrics_logs = self._metric_container_dict[name](y_true, y_pred)
self._history._epoch_metrics.update(y_pred_metrics_logs)
if self._num_stack == 2:
mid_pred = np.vstack(mid_pred_list)[:, :, :self._fit_params["target_dim"]]
mid_pred_metrics_logs = self._metric_container_dict[f"{name}_mid"](y_true, mid_pred)
self._history._epoch_metrics.update(mid_pred_metrics_logs)
self._network.train()
def _predict_batch(self, X: paddle.Tensor) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict one batch of data.
SCINet will return tuple(pred, mid_pred), where mid_pred is used for computing loss. Here only need
Args:
X(paddle.Tensor): Feature tensor.
Returns:
Tuple[np.ndarray, np.ndarray]: Prediction results, where tuple[0] is final pred result, tuple[1] is
intermediate (previous stack) pred result.
"""
y_pred, mid_pred = self._network(X)
if self._num_stack == 1:
# mid_pred is None when num_stack == 1, thus cannot call .numpy() for a NoneType Object.
return y_pred.numpy(), mid_pred
return y_pred.numpy(), mid_pred.numpy()