Source code for paddlets.models.anomaly.dl.anomaly_transformer

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""
This implementation is based on the article `Anomaly Transformer: Time Series Anomaly Detection with Association Discrepancy <https://arxiv.org/pdf/2110.02642.pdf>`_ .

Base model features
    Basic architecture: A network with stacking anomaly Attention. 
    
    Anomaly Attention have two branch,  one is self attention,  another one is Gaussian kernel.
    
    The backward is for computing of the the prior-association loss  and the series-association loss, distinguishing anomaly data in raw data. 

"""

from typing import List, Dict, Any, Callable, Optional

from paddle.optimizer import Optimizer
import paddle.nn.functional as F
import pandas as pd
import numpy as np
import paddle
import time

from paddlets.models.anomaly.dl.anomaly_base import AnomalyBaseModel
from paddlets.models.common.callbacks import Callback
from paddlets.models.anomaly.dl.utils import to_tsdataset
from paddlets.models.anomaly.dl import utils as U
from paddlets.datasets import TSDataset
from paddlets.logger import raise_if, raise_if_not
from paddlets.models.anomaly.dl._anomaly_transformer.encoder import Encoder, EncoderLayer
from paddlets.models.anomaly.dl._anomaly_transformer.attention import AttentionLayer, AnomalyAttention
from paddlets.models.anomaly.dl._anomaly_transformer.embedding import DataEmbedding


class _Anomaly(paddle.nn.Layer):
    """
    Anomaly transformer Network structure.
    
    Args:
        win_size(int): The size of the loopback window, i.e. the number of time steps feed to the model.
        enc_in(int): The number of feature in model input.
        c_out(int): The number of feature in model output.
        d_model(int): The expected feature size for the input of the anomaly transformer.
        n_heads(int): The number of heads in multi-head attention.
        e_layers(int): The number of attentionLayer layers to be stacked.
        d_ff(int): The Number of channels for FFN layers.
        dropout(float): Dropout regularization parameter.
        activation(Callable[..., paddle.Tensor]): The activation function for AnomalyAttention.
        output_attention(float): Whether to output series, prior and sigma.
        
    Attributes:
        _nn(paddle.nn.Sequential): Dynamic graph LayerList. 
    """
    def __init__(
        self, 
        win_size: int, 
        enc_in: int, 
        c_out: int, 
        d_model: int = 512, 
        n_heads: int = 8, 
        e_layers: int = 3, 
        d_ff: int = 512,
        dropout: float = 0.0, 
        activation: Callable[..., paddle.Tensor] = F.gelu, 
        output_attention: bool = True
    ):
        super(_Anomaly, self).__init__()
        self.output_attention = output_attention
        self.embedding = DataEmbedding(enc_in, d_model, dropout)
        self.encoder = Encoder(
            [EncoderLayer
             (AttentionLayer
                  (AnomalyAttention(win_size, False, attention_dropout=dropout, output_attention=output_attention),
                   d_model, n_heads
                  ),
                  d_model, d_ff, dropout=dropout, activation=activation
             ) for l in range(e_layers)
            ],
            norm_layer=paddle.nn.LayerNorm(d_model)
        )
        self.projection = paddle.nn.Linear(d_model, c_out, bias_attr=True)  #bias

    def forward(
        self, 
        x: Dict[str, paddle.Tensor]
   ) -> paddle.Tensor:
        """Anomaly transformer Forward.

        Args: 
            X(paddle.Tensor): Dict of feature tensor.

        Returns:
            paddle.Tensor: Output of model.
        """ 
        x = x["observed_cov_numeric"]
        enc_out = self.embedding(x)
        enc_out, series, prior, sigmas = self.encoder(enc_out)
        enc_out = self.projection(enc_out)
        if self.output_attention:
            return enc_out, series, prior, sigmas
        else:
            return enc_out  # [B, L, D]


[docs]class AnomalyTransformer(AnomalyBaseModel): """ Anomaly Transformer network for anomaly detection. Args: in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model. sampling_stride(int): Sampling intervals between two adjacent samples. loss_fn(Callable[..., paddle.Tensor]): Loss function. optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. threshold_fn(Callable[..., float]|None): The method to get anomaly threshold. criterion(Callable[..., paddle.Tensor]): Loss function for for the reconstruction loss. threshold(float|None): The threshold to judge anomaly. threshold_coeff(float): The coefficient of threshold. anomaly_score_fn(Callable[..., List[float]]|None): The method to get anomaly score. pred_adjust(bool): Whether to adjust the pred label according to the real label. pred_adjust_fn(Callable[..., np.ndarray]|None): The method to adjust pred label. optimizer_params(Dict[str, Any]): Optimizer parameters. eval_metrics(List[str]): Evaluation metrics of model. callbacks(List[Callback]): Customized callback functions. batch_size(int): Number of samples per batch. max_epochs(int): Max epochs during training. verbose(int): Verbosity mode. patience(int): Number of epochs to wait for improvement before terminating. seed(int|None): Global random seed. temperature(int|float): A parameter to adjust series loss and prior loss. k(int): The optimization is to enlarge the association discrepancy. anormly_ratio(int|float): The Proportion of Anomaly data in train set and test set. d_model(int): The expected feature size for the input of the anomaly transformer. n_heads(int): The number of heads in multi-head attention. e_layers(int): The number of attentionLayer layers to be stacked. d_ff(int): The Number of channels for FFN layers. dropout(float): Dropout regularization parameter. activation(Callable[..., paddle.Tensor]): The activation function for AnomalyAttention. output_attention(float): Whether to output series, prior and sigma. Attributes: _in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model. _sampling_stride(int): Sampling intervals between two adjacent samples. _loss_fn(Callable[..., paddle.Tensor]): Loss function. _optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. _threshold_fn(Callable[..., float]|None): The method to get anomaly threshold. _criterion(Callable[..., paddle.Tensor]): Loss function for for the reconstruction loss. _threshold(float|None): The threshold to judge anomaly. _threshold_coeff(float): The coefficient of threshold. _anomaly_score_fn(Callable[..., List[float]]|None): The method to get anomaly score. _pred_adjust(bool): Whether to adjust the pred label according to the real label. _pred_adjust_fn(Callable[..., np.ndarray]|None): The method to adjust pred label. _optimizer_params(Dict[str, Any]): Optimizer parameters. _eval_metrics(List[str]): Evaluation metrics of model. _callbacks(List[Callback]): Customized callback functions. _batch_size(int): Number of samples per batch. _max_epochs(int): Max epochs during training. _verbose(int): Verbosity mode. _patience(int): Number of epochs to wait for improvement before terminating. _seed(int|None): Global random seed. _temperature(int|float): A parameter to adjust series loss and prior loss. _k(int): The optimization is to enlarge the association discrepancy. _anormly_ratio(int|float): The Proportion of Anomaly data in train set and test set. _d_model(int): The expected feature size for the input of the anomaly transformer. _n_heads(int): The number of heads in multi-head attention. _e_layers(int): The number of attentionLayer layers to be stacked. _d_ff(int): The Number of channels for FFN layers. _dropout(float): Dropout regularization parameter. _activation(Callable[..., paddle.Tensor]): The activation function for AnomalyAttention. _output_attention(float): whether to output series, prior and sigma. _adjust_lr(function): Dynamic Learning Rate Adjustment. """ def __init__( self, in_chunk_len: int, sampling_stride: int = 1, loss_fn: Callable[..., paddle.Tensor] = U.series_prior_loss, optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam, threshold_fn: Callable[..., float] = U.anomaly_get_threshold, criterion: Callable[..., paddle.Tensor] = paddle.nn.MSELoss(), threshold: Optional[float] = None, threshold_coeff: float = 1.0, anomaly_score_fn: Callable[..., List[float]] = None, pred_adjust: bool = True, pred_adjust_fn: Callable[..., np.ndarray] = U.result_adjust, optimizer_params: Dict[str, Any] = dict(learning_rate=1e-4), eval_metrics: List[str] = [], callbacks: List[Callback] = [], batch_size: int = 32, max_epochs: int = 100, verbose: int = 1, patience: int = 10, seed: Optional[int] = None, temperature: int = 50, k: int = 3, anormly_ratio = 1, d_model: int = 512, n_heads: int = 8, e_layers: int = 3, d_ff: int = 512, dropout: float = 0.0, activation: Callable[..., paddle.Tensor] = F.gelu, output_attention = True, ): self.in_chunk_len = in_chunk_len self._d_model = d_model self._n_heads = n_heads self._e_layers = e_layers self._d_ff = d_ff self._dropout = dropout self._activation = activation self._output_attention = output_attention self._adjust_lr = U.adjust_learning_rate self._threshold_fn = threshold_fn self._criterion = criterion self._temperature = temperature self._k = k self._anormly_ratio = anormly_ratio super(AnomalyTransformer, self).__init__( in_chunk_len=in_chunk_len, sampling_stride=sampling_stride, loss_fn=loss_fn, optimizer_fn=optimizer_fn, threshold=threshold, threshold_coeff=threshold_coeff, threshold_fn=threshold_fn, anomaly_score_fn=anomaly_score_fn, pred_adjust=pred_adjust, pred_adjust_fn=pred_adjust_fn, optimizer_params=optimizer_params, eval_metrics=eval_metrics, callbacks=callbacks, batch_size=batch_size, max_epochs=max_epochs, verbose=verbose, patience=patience, seed=seed, ) def _update_fit_params( self, train_tsdataset: TSDataset, valid_tsdataset: Optional[TSDataset] = None ) -> Dict[str, Any]: """Infer parameters by TSdataset automatically. Args: train_tsdataset(TSDataset): Train dataset. valid_tsdataset(TSDataset|None): Validation dataset. Returns: Dict[str, Any]: model parameters. """ fit_params = { "observed_dim": train_tsdataset.get_observed_cov().data.shape[1] } return fit_params def _init_network(self) -> paddle.nn.Layer: """Setup the network. Returns: paddle.nn.Layer. """ return _Anomaly( self.in_chunk_len, self._fit_params["observed_dim"], self._fit_params["observed_dim"], self._d_model, self._n_heads, self._e_layers, self._d_ff, self._dropout, self._activation, self._output_attention, ) def _predict_batch( self, X: paddle.Tensor ) -> np.ndarray: """Predict one batch of data. Args: X(paddle.Tensor): Feature tensor. Returns: np.ndarray: Prediction results. """ scores = self._network(X)[0] y = X['observed_cov_numeric'] return y.numpy(), scores.numpy()
[docs] @to_tsdataset(scenario="anomaly_label") def predict( self, test_dataset: TSDataset, train_dataset: TSDataset = None, ) -> TSDataset: """Get anomaly label on a batch. the result are output as tsdataset. Args: train_dataset(TSDataset): Train set. test_dataset(TSDataset): Data to be predicted. Returns: TSDataset. """ raise_if(train_dataset is None, f" Please pass in train_tsdataset to calculate the threshold.") train_dataloader = self._init_predict_dataloader(train_dataset) test_dataloader = self._init_predict_dataloader(test_dataset) thre_dataloader = self._init_predict_dataloader(test_dataset, sampling_stride=self.in_chunk_len) self._threshold = self._threshold_fn(self._network, train_dataloader, test_dataloader, temperature=self._temperature, anormly_ratio=self._anormly_ratio, criterion = self._criterion, my_kl_loss = U.my_kl_loss, win_size=self.in_chunk_len, ) anomaly_score = self._get_anomaly_score(thre_dataloader) anomaly_label = [] for score in anomaly_score: label = 0 if score < self._threshold else 1 anomaly_label.append(label) if test_dataset.target is not None and self._pred_adjust: anomaly_label = self._pred_adjust_fn(anomaly_label, test_dataset.target.to_numpy()) return np.array(anomaly_label)
def _predict( self, dataloader: paddle.io.DataLoader, ) -> np.ndarray: """Predict function core logic. Args: dataloader(paddle.io.DataLoader): Data to be predicted. Returns: np.ndarray. """ self._network.eval() attens_energy = [] for batch_nb, data in enumerate(dataloader): y = data['observed_cov_numeric'] output, series, prior, _ = self._network(data) loss = paddle.mean(self._criterion(y, output), axis=-1) cri = U.series_prios_energy([output, series, prior, _], loss, temperature=self._temperature, win_size=self.in_chunk_len) attens_energy.append(cri) attens_energy = np.concatenate(attens_energy, axis=0).reshape(-1) test_energy = np.array(attens_energy) return np.array(test_energy)
[docs] def predict_score( self, tsdataset: TSDataset, ) -> TSDataset: """Get anomaly score on a batch. the result are output as tsdataset. Args: tsdataset(TSDataset): Data to be predicted. Returns: TSDataset. """ dataloader = self._init_predict_dataloader(tsdataset, sampling_stride=self.in_chunk_len) results = self._get_anomaly_score(dataloader) # Generate target cols target_cols = tsdataset.get_target() if target_cols is None: target_cols = ["anomaly_score"] else: target_cols = target_cols.data.columns target_cols = target_cols + '_score' # Generate target index freq target_index = tsdataset.get_observed_cov().data.index if isinstance(target_index, pd.RangeIndex): freq = target_index.step else: freq = target_index.freqstr results_size = results.size raise_if( results_size == 0, f"There is something wrong, anomaly predict size is 0, you'd better check the tsdataset or the predict logic." ) target_index = target_index[:results_size]# [-results_size:] anomaly_target = pd.DataFrame(results, index=target_index, columns=target_cols) return TSDataset.load_from_dataframe(anomaly_target, freq=freq)
[docs] def fit( self, train_tsdataset: TSDataset, valid_tsdataset: Optional[TSDataset] = None ): """Train a neural network stored in self._network, Using train_dataloader for training data and valid_dataloader for validation. Args: train_tsdataset(TSDataset): Train set. valid_tsdataset(TSDataset|None): Eval set, used for early stopping. """ self._fit_params = self._update_fit_params(train_tsdataset, valid_tsdataset) train_dataloader, valid_dataloaders = self._init_fit_dataloaders(train_tsdataset, valid_tsdataset) self._fit(train_dataloader, valid_dataloaders)
def _fit( self, train_dataloader: paddle.io.DataLoader, valid_dataloaders: List[paddle.io.DataLoader] = None ): """Fit function core logic. Args: train_dataloader(paddle.io.DataLoader): Train set. valid_dataloaders(List[paddle.io.DataLoader]|None): Eval set. """ valid_names = [f"val_{k}" for k in range(len(valid_dataloaders))] self._metrics, self._metrics_names, \ self._metric_container_dict = self._init_metrics(valid_names) self._history, self._callback_container = self._init_callbacks() self._network = self._init_network() self._optimizer = self._init_optimizer() # Call the `on_train_begin` method of each callback before the training starts. self._callback_container.on_train_begin({"start_time": time.time()}) for epoch_idx in range(self._max_epochs): # Call the `on_epoch_begin` method of each callback before the epoch starts. self._callback_container.on_epoch_begin(epoch_idx) self._train_epoch(train_dataloader) self._adjust_lr(self._optimizer, epoch_idx, self._optimizer_params['learning_rate']) # update lr # Predict for each eval set. for eval_name, valid_dataloader in zip(valid_names, valid_dataloaders): self._predict_epoch(eval_name, valid_dataloader) # Call the `on_epoch_end` method of each callback at the end of the epoch. self._callback_container.on_epoch_end( epoch_idx, logs=self._history._epoch_metrics ) if self._stop_training: break # Call the `on_train_end` method of each callback at the end of the training. self._callback_container.on_train_end() self._network.eval() def _train_epoch( self, train_loader: paddle.io.DataLoader ): """ Trains one epoch of the network in self._network. Args: train_loader(paddle.io.DataLoader): Training dataloader. """ self._network.train() loss1_list = [] for batch_idx, data in enumerate(train_loader): self._callback_container.on_batch_begin(batch_idx) y = data['observed_cov_numeric'] batch_logs = self._train_batch(data, y, batch_idx) self._callback_container.on_batch_end(batch_idx, batch_logs) epoch_logs = {"lr": self._optimizer.get_lr()} self._history._epoch_metrics.update(epoch_logs) def _train_batch( self, X: Dict[str, paddle.Tensor], y: paddle.Tensor, batch_idx = None, ) -> Dict[str, Any]: """Trains one batch of data. Args: X(Dict[str, paddle.Tensor]): Dict of feature tensor. y(paddle.Tensor): Target tensor. Returns: Dict[str, Any]: Dict of logs. """ self._optimizer.clear_grad() output_list = self._network(X) loss1, loss2, for_loss_one = self._compute_loss(output_list, y, criterion = self._criterion, win_size=self.in_chunk_len, k=self._k) # Minimax strategy loss1.backward(retain_graph=True) loss2.backward() self._optimizer.step() avg_loss = for_loss_one batch_logs = { "batch_size": y.shape[0], "loss": avg_loss } return batch_logs def _compute_loss( self, y_score: paddle.Tensor, y_true: paddle.Tensor, criterion = paddle.nn.MSELoss(), win_size = 100, k = 3 ) -> paddle.Tensor: """Compute the loss. Note: This function could be overrided by the subclass if necessary. Args: y_score(paddle.Tensor): Estimated target values. y_true(paddle.Tensor): Ground truth (correct) target values. criterion(Callable[..., paddle.Tensor]): Loss function. win_size(int): The size of the loopback window, i.e. the number of time steps feed to the model. k(int): The optimization is to enlarge the association discrepancy. Returns: paddle.Tensor: Loss value. """ return self._loss_fn(y_score, y_true, criterion, win_size, k)