Source code for paddlets.models.representation.dl.ts2vec

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from typing import List, Dict, Any, Callable, Optional 

import numpy as np
import paddle
import tqdm

from paddle.optimizer import Optimizer
import paddle.nn.functional as F

from paddlets.models.representation.dl._ts2vec.utils import (
    create_ts2vec_inputs,
    create_contrastive_inputs,
    instance_level_encoding,
    multiscale_encoding,
    custom_collate_fn,
)
from paddlets.models.representation.dl._ts2vec.losses import hierarchical_contrastive_loss
from paddlets.models.representation.dl._ts2vec.encoder import TSEncoder
from paddlets.models.representation.dl._ts2vec.swa import AveragedModel
from paddlets.models.representation.dl.repr_base import ReprBaseModel
from paddlets.models.data_adapter import DataAdapter
from paddlets.models.common.callbacks import Callback
from paddlets.datasets import TSDataset
from paddlets.logger import raise_if_not


class _TS2VecModule(paddle.nn.Layer):
    """Paddle layer implementing TS2Vec.

    Args:
        in_channels(int): The number of channels in the input series.
        out_channels(int): The number of channels in the output series.
        hidden_channels(int): The number of channels in the hidden layer.
        num_layers(int): The number of `ConvLayer` to be stacked.
    
    Attributes:
        _feat_extractor(paddle.nn.Layer): A stacked LayerList containing `DilatedConvLayer`.
        _avg_extractor(paddle.nn.Layer): An averaged model of `_feat_extractor` for Stochastic Weight Averaging (SWA).
    """
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        hidden_channels: int,
        num_layers: int,
    ):
        super(_TS2VecModule, self).__init__()
        self._feat_extractor = TSEncoder(
            in_channels=in_channels,
            out_channels=out_channels,
            hidden_channels=hidden_channels,
            num_layers=num_layers
        )
        self._avg_extractor = AveragedModel(self._feat_extractor)
        self._avg_extractor.update_parameters(self._feat_extractor)

    def forward(
        self, 
        X: paddle.Tensor,
        mask: Optional[str],
    ) -> paddle.Tensor:
        """Forward.

        Args:
            X(paddle.Tensor): The input of TS2Vec's feature extractor.
            mask(str): The mask type, ["binomial", "all_true", "mask_last"] is optional.

        Returns:
            paddle.Tensor: Out of model.
        """
        if self.training:
            return self._feat_extractor(X, mask)
        return self._avg_extractor(X, mask)
    
    def parameters(self):
        """Returns a list of all Parameters from current layer and its sub-layers.
        """
        return self._feat_extractor.parameters()


[docs]class TS2Vec(ReprBaseModel): """TS2Vec\[1\] is a time series representation model introduced in 2021, It is a universal framework for learning representations of time series in an arbitrary semantic level. TS2Vec performs contrastive learning in a hierarchical way over augmented context views, which enables a robust contextual representation for each timestamp. \[1\] Yue Z, et al. "TS2Vec: Towards universal representation of time series", `<https://arxiv.org/abs/2106.10466>`_ Args: segment_size(int): The size of time series segment. sampling_stride(int): Sampling intervals between two adjacent samples. optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. optimizer_params(Dict[str, Any]): Optimizer parameters. callbacks(List[Callback]): Customized callback functions. batch_size(int): Number of samples per batch. max_epochs(int): Max epochs during training. verbose(int): Verbosity mode. seed(int|None): Global random seed. repr_dims(int): The dimension of representation. hidden_dims(int): The number of channels in the hidden layer. num_layers(int): The number of `ConvLayer` to be stacked. temporal_unit(int): The minimum unit to perform temporal contrast. Attributes: _segment_size(int): The size of time series segment. _sampling_stride(int): Sampling intervals between two adjacent samples. _optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. _optimizer_params(Dict[str, Any]): Optimizer parameters. _callbacks(List[Callback]): Customized callback functions. _batch_size(int): Number of samples per batch. _max_epochs(int): Max epochs during training. _verbose(int): Verbosity mode. _seed(int|None): Global random seed. _repr_dims(int): The dimension of representation. _hidden_dims(int): The number of channels in the hidden layer. _num_layers(int): The number of `ConvLayer` to be stacked. _temporal_unit(int): The minimum unit to perform temporal contrast. """ def __init__( self, segment_size: int, sampling_stride: int = 1, optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.AdamW, optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3), callbacks: List[Callback] = [], batch_size: int = 128, max_epochs: int = 10, verbose: int = 1, seed: Optional[int] = None, repr_dims: int = 320, hidden_dims: int = 64, num_layers: int = 10, temporal_unit: int = 0, ): self._repr_dims = repr_dims self._hidden_dims = hidden_dims self._num_layers = num_layers self._temporal_unit = temporal_unit super(TS2Vec, self).__init__( segment_size=segment_size, sampling_stride=sampling_stride, optimizer_fn=optimizer_fn, optimizer_params=optimizer_params, callbacks=callbacks, batch_size=batch_size, max_epochs=max_epochs, verbose=verbose, seed=seed, ) def _check_tsdataset( self, tsdataset: TSDataset ): """Ensure the robustness of input data (consistent feature order), at the same time, check whether the data types are compatible. If not, the processing logic is as follows: 1> Integer: Convert to np.int64. 2> Floating: Convert to np.float32. 3> Missing value: Warning. 4> Other: Illegal. Args: tsdataset(TSDataset): Data to be checked. """ target_columns = tsdataset.get_target().dtypes.keys() for column, dtype in tsdataset.dtypes.items(): if column in target_columns: raise_if_not( np.issubdtype(dtype, np.floating), f"TS2Vec's target dtype only supports [float16, float32, float64], " \ f"but received {column}: {dtype}." ) continue raise_if_not( np.issubdtype(dtype, np.floating), f"TS2Vec's cov(observed or known) dtype currently only supports [float16, float32, float64], " \ f"but received {column}: {dtype}." ) super(TS2Vec, self)._check_tsdataset(tsdataset) def _update_fit_params( self, train_tsdataset: List[TSDataset], ) -> Dict[str, Any]: """Infer parameters by TSdataset automatically. Args: train_tsdataset(List[TSDataset]): train dataset. Returns: Dict[str, Any]: model parameters. """ train_tsdataset = train_tsdataset[0] input_dim = train_tsdataset.get_target().data.shape[1] if train_tsdataset.get_observed_cov(): input_dim += train_tsdataset.get_observed_cov().data.shape[1] if train_tsdataset.get_known_cov(): input_dim += train_tsdataset.get_known_cov().data.shape[1] fit_params = { "input_dim": input_dim } return fit_params def _init_fit_dataloader( self, train_tsdataset: List[TSDataset] ) -> paddle.io.DataLoader: """Generate dataloader for train set. Args: train_tsdataset(List[TSDataset]): Train set. Returns: paddle.io.DataLoader: Training dataloader. """ data_adapter, samples = DataAdapter(), [] for tsdataset in train_tsdataset: self._check_tsdataset(tsdataset) dataset = data_adapter.to_sample_dataset( rawdataset=tsdataset, in_chunk_len=self._segment_size, sampling_stride=self._sampling_stride, fill_last_value=np.nan ) samples.extend(dataset.samples) dataset.samples = custom_collate_fn(samples) return data_adapter.to_paddle_dataloader(dataset, self._batch_size) def _init_network(self) -> paddle.nn.Layer: """Setup the network. Returns: paddle.nn.Layer """ return _TS2VecModule( in_channels=self._fit_params["input_dim"], out_channels=self._repr_dims, hidden_channels=self._hidden_dims, num_layers=self._num_layers, ) def _train_batch( self, X: Dict[str, paddle.Tensor], ) -> Dict[str, Any]: """Trains one batch of data. Args: X(Dict[str, paddle.Tensor]): Dict of feature tensor. Returns: Dict[str, Any]: Dict of logs. """ batch_logs = super(TS2Vec, self)._train_batch(X) self._network._avg_extractor.update_parameters( self._network._feat_extractor ) return batch_logs def _compute_loss( self, X: Dict[str, paddle.Tensor] ) -> paddle.Tensor: """Compute the loss. Args: X(Dict[str, paddle.Tensor]): Dict of feature tensor. Returns: paddle.Tensor: Loss value. """ feats = create_ts2vec_inputs(X) if feats.shape[1] > self._segment_size: offset = np.random.randint(feats.shape[1] - self._segment_size + 1) feats = feats[:, offset: offset + self._segment_size] aug1, aug2, overlap_len = create_contrastive_inputs(feats, self._temporal_unit) repr1 = self._network(aug1, mask="binomial")[:, -overlap_len:, :] repr2 = self._network(aug2, mask="binomial")[:, :overlap_len, :] loss = hierarchical_contrastive_loss(repr1, repr2) return loss def _encode( self, dataloader: paddle.io.DataLoader, mask: Optional[str] = "all_true", encoding_type: Optional[str] = None, sliding_len: Optional[int] = None, batch_size: Optional[int] = None, verbose: bool = True ) -> np.ndarray: """Encode function core logic. Args: dataloader(paddle.io.DataLoader): The data to be encoded. mask(str): The mask used by encoder can be specified with this parameter. ["all_true", "mask_last"] is optional. encoding_type(str): When this parameter is specified, the computed representation would be max pooling over all input series. ["full_series", "multiscale"] is optional. sliding_len(int): The contextual series length used for inference. batch_size(int): The batch size used for inference. If not specified, this would be the same batch size as training. verbose(bool): Turn on Verbose mode,set to true by default. Returns: np.ndarray: The representations for input series. """ def _encode_one_batch(buffer): """Encode one batch of data. """ padding_tensor = paddle.concat(buffer, axis=0) out = self._network(padding_tensor, mask) if encoding_type == "multiscale": out = multiscale_encoding(out) out = out[:, -1:, :].cpu() out = paddle.transpose(out, perm=[1, 0, 2]) buffer.clear() return out raise_if_not( mask in ("all_true", "mask_last"), f"mask must be either `all_true` or `mask_last`" ) if encoding_type is not None: raise_if_not( encoding_type in ("full_series", "multiscale"), f"encoding_type must be either `full_series` or `multiscale`" ) batch_size = ( self._batch_size if batch_size is None else batch_size ) data = iter(dataloader).next() feats = create_ts2vec_inputs(data) self._network.eval() # Sliding inference(casual) # The timestamp t's representation is computed # using the observations located in [t - sliding_len, t]. if sliding_len is not None: seq_len, buffer, reprs = feats.shape[1], [], [] for timestamp_idx in tqdm.tqdm(range(seq_len), disable=not verbose): start = timestamp_idx - sliding_len end = timestamp_idx + 1 padding_left = (-start if start < 0 else 0) padding_right = (end - seq_len if end > seq_len else 0) padding_tensor = F.pad( feats[:, max(start, 0): min(end, seq_len), :], (padding_left, padding_right), value=np.nan, data_format="NLC" ) # Accumulate a batch and encode together. buffer.append(padding_tensor) if len(buffer) < batch_size: continue out = _encode_one_batch(buffer) reprs.append(out) if len(buffer) > 0: out = _encode_one_batch(buffer) reprs.append(out) out = paddle.concat(reprs, axis=1) if encoding_type == "full_series": out = instance_level_encoding(out) out = paddle.squeeze(out, axis=1) return out.numpy() # instance level encoding. if encoding_type == "full_series": out = self._network(feats, mask) out = instance_level_encoding(out) out = paddle.squeeze(out, axis=1) return out.numpy() # raw output. out = self._network(feats, mask) return out.numpy()