#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from typing import List, Dict, Any, Callable, Optional
import numpy as np
import paddle
import tqdm
from paddle.optimizer import Optimizer
import paddle.nn.functional as F
from paddlets.models.representation.dl._cost.utils import (
create_cost_inputs,
create_contrastive_inputs,
custom_collate_fn,
)
from paddlets.models.representation.dl._cost.losses import (
time_contrastive_loss,
frequency_contrastive_loss,
convert_coefficient
)
from paddlets.models.representation.dl._cost.encoder import TSEncoder
from paddlets.models.representation.dl._cost.swa import AveragedModel
from paddlets.models.representation.dl.repr_base import ReprBaseModel
from paddlets.models.data_adapter import DataAdapter
from paddlets.models.common.callbacks import Callback
from paddlets.datasets import TSDataset
from paddlets.logger import raise_if_not
class _CoSTModule(paddle.nn.Layer):
"""Paddle layer implementing CoST.
Args:
in_channels(int): The number of channels in the input series.
out_channels(int): The number of channels in the output series.
hidden_channels(int): The number of channels in the hidden layer.
num_layers(int): The number of `ConvLayer` to be stacked.
segment_size(int): The size of time series segment.
queue_size(int): The dynamic queue size for saving negative examples.
temperature(float): The temperature coefficient.
alpha(float): The weight of seasonal component in the loss.
Attributes:
_feat_extractor(paddle.nn.Layer): A stacked LayerList containing `DilatedConvLayer`.
_avg_extractor(paddle.nn.Layer): An averaged model of `_feat_extractor` for Stochastic Weight Averaging (SWA).
_out_proj(paddle.nn.Layer): A projection head, widely used for contrastive learning.
_avg_head(paddle.nn.Layer): An averaged model of `_out_proj` for Stochastic Weight Averaging (SWA).
_queue_size(int): The dynamic queue size for saving negative examples.
_temperature(float): The temperature coefficient.
_alpha(float): The parameter control the weightage of seasonal components in loss.
"""
def __init__(
self,
in_channels: int,
out_channels: int,
hidden_channels: int,
num_layers: int,
segment_size: int,
queue_size: int,
temperature: float,
alpha: float
):
super(_CoSTModule, self).__init__()
self._feat_extractor = TSEncoder(
in_channels=in_channels,
out_channels=out_channels,
hidden_channels=hidden_channels,
num_layers=num_layers,
seq_len=segment_size
) # feature extractor
k = np.sqrt(1. / (out_channels // 2))
weight_attr = bias_attr = paddle.ParamAttr(
initializer=paddle.nn.initializer.Uniform(-k, k)
)
dim = (out_channels // 2)
self._out_proj = paddle.nn.Sequential(
paddle.nn.Linear(dim, dim, weight_attr, bias_attr),
paddle.nn.ReLU(),
paddle.nn.Linear(dim, dim, weight_attr, bias_attr),
) # projection head
self._avg_extractor = AveragedModel(self._feat_extractor)
self._avg_extractor.update_parameters(self._feat_extractor)
self._avg_proj = AveragedModel(self._out_proj)
self._avg_proj.update_parameters(self._out_proj)
self._alpha = alpha
self._queue_size = queue_size
self._temperature = temperature
queue = paddle.randn((out_channels // 2, queue_size))
self.register_buffer("_queue", F.normalize(queue, axis=0))
self.register_buffer("_ptr", paddle.zeros([1], dtype="int64"))
def _dequeue_and_enqueue(
self,
keys: paddle.Tensor
):
"""With reference to MoCO implementation, maintian a dynamic queue for saving negtive examples.
Args:
keys: The negative example of waiting to enter the queue.
"""
ptr, offset = self._ptr[0], keys.shape[0]
self._queue[:, ptr: ptr + offset] = keys.T
self._ptr[0] = (ptr + offset) % self._queue_size
def forward(
self,
X: paddle.Tensor,
mask: Optional[str],
) -> paddle.Tensor:
"""Forward.
Args:
X(paddle.Tensor): The input of CoST's feature extractor.
mask(str): The mask type, ["binomial", "all_true"] is optional.
Returns:
paddle.Tensor: Out of model or loss value.
"""
if not self.training:
return self._feat_extractor(X, mask)
# Generate two augmented samples.
aug1 = create_contrastive_inputs(X)
aug2 = create_contrastive_inputs(X)
trend1, season1 = self._feat_extractor(aug1, mask)
trend2, _ = self._avg_extractor(aug2, mask)
_, season2 = self._feat_extractor(aug2, mask)
# Trend: contrastive learning(reference MoCO) in time domain.
# In order to calculate efficiency, a certain time step is randomly sampled.
batch_size, seq_len, channels = trend1.shape
axis0 = np.arange(batch_size)[:, None, None]
axis1 = np.random.randint(0, seq_len, (1, ))[None, :, None]
axis2 = np.arange(channels)[None, None, :]
trend1 = paddle.squeeze(trend1[axis0, axis1, axis2], 1)
trend1 = self._out_proj(trend1)
trend1 = F.normalize(trend1, axis=-1)
season1 = F.normalize(season1, axis=-1)
trend2 = paddle.squeeze(trend2[axis0, axis1, axis2], 1)
trend2 = self._avg_proj(trend2)
trend2 = F.normalize(trend2, axis=-1)
season2 = F.normalize(season2, axis=-1)
# Season: contrastive learning in frequency domain.
season1_freq = paddle.fft.rfft(season1, axis=1)
season2_freq = paddle.fft.rfft(season2, axis=1)
season1_amp, season1_phase = convert_coefficient(season1_freq)
season2_amp, season2_phase = convert_coefficient(season2_freq)
trend_loss = time_contrastive_loss(
trend1, trend2, paddle.assign(self._queue.detach()), self._temperature
)
season_loss = frequency_contrastive_loss(season1_amp, season2_amp)
season_loss += frequency_contrastive_loss(season1_phase, season2_phase)
self._dequeue_and_enqueue(trend2)
return trend_loss + (self._alpha * season_loss / 2.)
def parameters(self):
"""Returns a list of all Parameters from current layer and its sub-layers.
"""
return self._feat_extractor.parameters() + self._out_proj.parameters()
[docs]class CoST(ReprBaseModel):
"""CoST\[1\] is a time series representation model published in ICLR 2022,
It is a new time series representation learning framework for long sequence time series forecasting,
which applies the contrastive learning method to learn disentangled seasonal-trend representations.
CoST comprises both time domain and frequency domain contrastive losses to learn
discriminative trend and seasonal representations, respectively.
\[1\] Woo G, et al. "CoST: Contrastive Learning of Disentangled Seasonal-Trend Representations
for Time Series Forecasting", `<https://arxiv.org/pdf/2202.01575.pdf>`_
Args:
segment_size(int): The size of time series segment.
sampling_stride(int): Sampling intervals between two adjacent samples.
optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
optimizer_params(Dict[str, Any]): Optimizer parameters.
callbacks(List[Callback]): Customized callback functions.
batch_size(int): Number of samples per batch.
max_epochs(int): Max epochs during training.
verbose(int): Verbosity mode.
seed(int|None): Global random seed.
repr_dims(int): The dimension of representation.
hidden_dims(int): The number of channels in the hidden layer.
num_layers(int): The number of `ConvLayer` to be stacked.
queue_size(int): The dynamic queue size for saving negative examples.
temperature(float): The temperature coefficient.
alpha(float): The weight of seasonal components in loss.
Attributes:
_segment_size(int): The size of time series segment.
_sampling_stride(int): Sampling intervals between two adjacent samples.
_optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm.
_optimizer_params(Dict[str, Any]): Optimizer parameters.
_callbacks(List[Callback]): Customized callback functions.
_batch_size(int): Number of samples per batch.
_max_epochs(int): Max epochs during training.
_verbose(int): Verbosity mode.
_seed(int|None): Global random seed.
_repr_dims(int): The dimension of representation.
_hidden_dims(int): The number of channels in the hidden layer.
_num_layers(int): The number of `ConvLayer` to be stacked.
_queue_size(int): The dynamic queue size for saving negative examples.
_temperature(float): The temperature coefficient.
_alpha(float): The parameter control the weightage of seasonal components in loss.
"""
def __init__(
self,
segment_size: int,
sampling_stride: int = 1,
optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Momentum,
optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3),
callbacks: List[Callback] = [],
batch_size: int = 128,
max_epochs: int = 10,
verbose: int = 1,
seed: Optional[int] = None,
repr_dims: int = 320,
hidden_dims: int = 64,
num_layers: int = 10,
queue_size: int = 256,
temperature: float = 0.07,
alpha: float = 5e-4
):
raise_if_not(
queue_size % batch_size == 0,
f"queue_size must be divisible by batch_size." \
)
self._segment_size = segment_size
self._repr_dims = repr_dims
self._hidden_dims = hidden_dims
self._num_layers = num_layers
self._queue_size = queue_size
self._temperature = temperature
self._alpha = alpha
super(CoST, self).__init__(
segment_size=segment_size,
sampling_stride=sampling_stride,
optimizer_fn=optimizer_fn,
optimizer_params=optimizer_params,
callbacks=callbacks,
batch_size=batch_size,
max_epochs=max_epochs,
verbose=verbose,
seed=seed,
)
def _check_tsdataset(
self,
tsdataset: TSDataset
):
"""Ensure the robustness of input data (consistent feature order), at the same time,
check whether the data types are compatible. If not, the processing logic is as follows:
1> Integer: Convert to np.int64.
2> Floating: Convert to np.float32.
3> Missing value: Warning.
4> Other: Illegal.
Args:
tsdataset(TSDataset): Data to be checked.
"""
target_columns = tsdataset.get_target().dtypes.keys()
for column, dtype in tsdataset.dtypes.items():
if column in target_columns:
raise_if_not(
np.issubdtype(dtype, np.floating),
f"CoST's target dtype only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
continue
raise_if_not(
np.issubdtype(dtype, np.floating),
f"CoST's cov(observed or known) dtype currently only supports [float16, float32, float64], " \
f"but received {column}: {dtype}."
)
super(CoST, self)._check_tsdataset(tsdataset)
def _update_fit_params(
self,
train_tsdataset: List[TSDataset],
) -> Dict[str, Any]:
"""Infer parameters by TSdataset automatically.
Args:
train_tsdataset(TSDataset): train dataset.
Returns:
Dict[str, Any]: model parameters.
"""
train_tsdataset = train_tsdataset[0]
input_dim = train_tsdataset.get_target().data.shape[1]
if train_tsdataset.get_observed_cov():
input_dim += train_tsdataset.get_observed_cov().data.shape[1]
if train_tsdataset.get_known_cov():
input_dim += train_tsdataset.get_known_cov().data.shape[1]
fit_params = {
"input_dim": input_dim
}
return fit_params
def _init_fit_dataloader(
self,
train_tsdataset: List[TSDataset]
) -> paddle.io.DataLoader:
"""Generate dataloader for train set.
Args:
train_tsdataset(List[TSDataset]): Train set.
Returns:
paddle.io.DataLoader: Training dataloader.
"""
data_adapter, samples = DataAdapter(), []
for tsdataset in train_tsdataset:
self._check_tsdataset(tsdataset)
dataset = data_adapter.to_sample_dataset(
rawdataset=tsdataset,
in_chunk_len=self._segment_size,
sampling_stride=self._sampling_stride,
fill_last_value=np.nan
)
samples.extend(dataset.samples)
# In order to align with the paper of CoST, a customized data organization is required.
samples = custom_collate_fn(samples)
dataset.samples = samples * (
1 if len(samples) >= self._batch_size else int(np.ceil(self._batch_size / len(samples)))
)
return data_adapter.to_paddle_dataloader(dataset, self._batch_size, drop_last=True)
def _init_network(self) -> paddle.nn.Layer:
"""Setup the network.
Returns:
paddle.nn.Layer
"""
return _CoSTModule(
in_channels=self._fit_params["input_dim"],
out_channels=self._repr_dims,
hidden_channels=self._hidden_dims,
num_layers=self._num_layers,
segment_size=self._segment_size,
queue_size=self._queue_size,
temperature=self._temperature,
alpha=self._alpha
)
def _train_batch(
self,
X: Dict[str, paddle.Tensor],
) -> Dict[str, Any]:
"""Trains one batch of data.
Args:
X(Dict[str, paddle.Tensor]): Dict of feature tensor.
Returns:
Dict[str, Any]: Dict of logs.
"""
batch_logs = super(CoST, self)._train_batch(X)
self._network._avg_extractor.update_parameters(
self._network._feat_extractor
)
self._network._avg_proj.update_parameters(
self._network._out_proj
)
return batch_logs
def _compute_loss(
self,
X: Dict[str, paddle.Tensor]
) -> paddle.Tensor:
"""Compute the loss.
Args:
X(Dict[str, paddle.Tensor]): Dict of feature tensor.
Returns:
paddle.Tensor: Loss value.
"""
feats = create_cost_inputs(X)
if feats.shape[1] > self._segment_size:
offset = np.random.randint(feats.shape[1] - self._segment_size + 1)
feats = feats[:, offset: offset + self._segment_size]
loss = self._network(feats, mask="binomial")
return loss
def _encode(
self,
dataloader: paddle.io.DataLoader,
batch_size: Optional[int] = None,
verbose: bool = True
) -> np.ndarray:
"""Encode function core logic.
Args:
dataloader(paddle.io.DataLoader): The data to be encoded.
batch_size(int): The batch size used for inference. If not specified,
this would be the same batch size as training.
verbose(bool): Turn on Verbose mode,set to true by default.
Returns:
np.ndarray: The representations for input series.
"""
def _encode_one_batch(buffer):
"""Encode one batch of data.
"""
padding_tensor = paddle.concat(buffer, axis=0)
trend, season = self._network(padding_tensor, mask="all_true")
out = paddle.concat([trend, season], axis=-1)
out = out[:, -1:, :].cpu()
out = paddle.transpose(out, perm=[1, 0, 2])
buffer.clear()
return out
batch_size = (
self._batch_size if batch_size is None else batch_size
)
data = iter(dataloader).next()
feats = create_cost_inputs(data)
self._network.eval()
seq_len, buffer, reprs = feats.shape[1], [], []
for timestamp_idx in tqdm.tqdm(range(seq_len), disable=not verbose):
start = timestamp_idx - (self._segment_size - 1)
end = timestamp_idx + 1
padding_left = (-start if start < 0 else 0)
padding_right = (end - seq_len if end > seq_len else 0)
padding_tensor = F.pad(
feats[:, max(start, 0): min(end, seq_len), :],
(padding_left, padding_right),
value=np.nan,
data_format="NLC"
)
# Accumulate a batch and encode together.
buffer.append(padding_tensor)
if len(buffer) < batch_size:
continue
out = _encode_one_batch(buffer)
reprs.append(out)
if len(buffer) > 0:
out = _encode_one_batch(buffer)
reprs.append(out)
out = paddle.concat(reprs, axis=1)
return out.numpy()