Source code for paddlets.models.anomaly.dl.autoencoder

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from typing import List, Dict, Any, Callable, Optional
import collections

from paddle.optimizer import Optimizer
import paddle.nn.functional as F
import numpy as np
import paddle

from paddlets.models.anomaly.dl.anomaly_base import AnomalyBaseModel
from paddlets.models.anomaly.dl._ed.ed import MLP, CNN
from paddlets.models.common.callbacks import Callback
from paddlets.models.anomaly.dl import utils as U
from paddlets.datasets import TSDataset
from paddlets.logger import raise_if, raise_if_not

       
class _AEBlock(paddle.nn.Layer):
    """AE Network structure.

    Args:
        in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model.
        ed_type(str): The type of encoder and decoder.
        fit_params(dict): The parameters for fitting, including dimensions and dict sizes of variables.
        hidden_config(List[int]): The ith element represents the number of neurons in the ith hidden layer.
        activation(Callable[..., paddle.Tensor]): The activation function for the hidden layers.
        last_layer_activation(Callable[..., paddle.Tensor]): The activation function for the last layer.
        kernel_size(int): Kernel size for Conv1D.
        dropout_rate(float): Dropout regularization parameter.
        use_bn(bool): Whether to use batch normalization.
        embedding_size(int): The size of each one-dimension embedding vector.
        pooling(bool): Whether to use average pooling to aggregate embeddings, if False, concat each embedding.
        
    Attributes:
        _pooling(bool): Whether to use average pooling to aggregate embeddings, if False, concat each embedding.
        _cat_size(int): The category feature size.
        _cat_dim(int): The category feature dims after embedding.
        _num_dim(int): The numerical feature dims.
        _observed_cat_emb(List[int]): The emb of category feature.
        _encoder(paddle.nn.Sequential): Dynamic graph LayerList for encoder.
        _decoder(paddle.nn.Sequential): Dynamic graph LayerList for decoder.
    """
    def __init__(
        self,
        in_chunk_len: int,
        ed_type: str,
        fit_params: Dict[str, Any],
        hidden_config: List[int],
        activation: Callable[..., paddle.Tensor],
        last_layer_activation: Callable[..., paddle.Tensor],
        kernel_size: int,
        dropout_rate: float,
        use_bn: bool,
        embedding_size: int,
        pooling: bool,
    ):
        super(_AEBlock, self).__init__()
        raise_if_not(
            ed_type in ("MLP", "CNN"), 
            "`ae_type` must be either 'MLP' or 'CNN'"
        )
        raise_if(
            np.any(np.array(hidden_config) <= 0),
            f"hidden_config must be > 0, got {hidden_config}."
        )
        # embedding cate feature
        self._pooling = pooling
        self._cat_size = len(fit_params['observed_cat_cols'])
        self._cat_dim = 0
        self._num_dim = fit_params['observed_num_dim']
        if fit_params['observed_cat_cols']:
            observed_cat_cols = fit_params['observed_cat_cols']
            self._observed_cat_emb = []
            for col, col_size in observed_cat_cols.items():
                self._observed_cat_emb.append(paddle.nn.Embedding(col_size, embedding_size))
            if pooling:
                self._cat_dim = embedding_size
            else:
                self._cat_dim = embedding_size * len(observed_cat_cols)
                
        feature_dim = self._num_dim + self._cat_dim
        if ed_type == 'MLP':
            self._encoder = MLP(in_chunk_len, feature_dim, hidden_config, \
                               activation, last_layer_activation, dropout_rate, use_bn)
            self._decoder = MLP(hidden_config[-1], feature_dim, hidden_config[::-1][1:] + [in_chunk_len], \
                               activation, last_layer_activation, dropout_rate, use_bn)
        elif ed_type == 'CNN':
            for i in range(len(hidden_config)):
                out_chunk_len = in_chunk_len - kernel_size + 1
                raise_if(
                    out_chunk_len < 1,
                    "Conv1d output size must be greater than or equal to 1, "
                    "Please choose a smaller `kernel_size` or bigger `in_chunk_len`"
                )
                in_chunk_len = out_chunk_len
            self._encoder = CNN(feature_dim, hidden_config, activation, last_layer_activation, kernel_size, \
                                dropout_rate, use_bn, is_encoder=True)
            self._decoder = CNN(hidden_config[-1], hidden_config[::-1][1:] + [feature_dim], \
                                activation, last_layer_activation, kernel_size, dropout_rate, use_bn, is_encoder=False)
    def forward(
        self, 
        X: Dict[str, paddle.Tensor]
    ) -> paddle.Tensor:
        """Forward.

        Args: 
            X(paddle.Tensor): Dict of feature tensor.

        Returns:
            paddle.Tensor: Output of model.
        """        
        x = paddle.transpose(X["observed_cov_numeric"], perm=[0, 2, 1])
        if self._cat_size > 0:
            feature_cat = []
            observed_cat = paddle.transpose(X["observed_cov_categorical"], perm=[0, 2, 1])
            for i in range(self._cat_size):
                feature_cat.append(self._observed_cat_emb[i](observed_cat[:, i]))
            if self._pooling:
                feature_cat = paddle.stack(feature_cat, axis=-1).mean(axis=-1)
            else:
                feature_cat = paddle.concat(feature_cat, axis=-1)
            feature_cat = paddle.transpose(feature_cat, perm=[0, 2, 1])
            x = paddle.concat([x, feature_cat], axis=-2)
            
        h = self._encoder(x)
        recon = self._decoder(h)
        
        return paddle.transpose(recon, perm=[0, 2, 1]), paddle.transpose(x, perm=[0, 2, 1])
        

[docs]class AutoEncoder(AnomalyBaseModel): """Auto encoder network for anomaly detection. Args: in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model. sampling_stride(int): Sampling intervals between two adjacent samples. loss_fn(Callable[..., paddle.Tensor]): Loss function. optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. threshold_fn(Callable[..., float]|None): The method to get anomaly threshold. q(float): The parameter used to calculate the quantile which range is [0, 100]. threshold(float|None): The threshold to judge anomaly. anomaly_score_fn(Callable[..., List[float]]|None): The method to get anomaly score. pred_adjust(bool): Whether to adjust the pred label according to the real label. pred_adjust_fn(Callable[..., np.ndarray]|None): The method to adjust pred label. optimizer_params(Dict[str, Any]): Optimizer parameters. eval_metrics(List[str]): Evaluation metrics of model. callbacks(List[Callback]): Customized callback functions. batch_size(int): Number of samples per batch. max_epochs(int): Max epochs during training. verbose(int): Verbosity mode. patience(int): Number of epochs to wait for improvement before terminating. seed(int|None): Global random seed. ed_type(str): The type of encoder and decoder. activation(Callable[..., paddle.Tensor]): The activation function for the hidden layers. last_layer_activation(Callable[..., paddle.Tensor]): The activation function for the last layer. hidden_config(List[int]|None): The ith element represents the number of neurons in the ith hidden layer. kernel_size(int): Kernel size for Conv1D. dropout_rate(float): Dropout regularization parameter. use_bn(bool): Whether to use batch normalization. embedding_size(int): The size of each embedding vector. pooling: Whether to use average pooling to aggregate embeddings, if False, concat each embedding. Attributes: _in_chunk_len(int): The size of the loopback window, i.e. the number of time steps feed to the model. _sampling_stride(int): Sampling intervals between two adjacent samples. _loss_fn(Callable[..., paddle.Tensor]): Loss function. _optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. _threshold_fn(Callable[..., float]|None)): The method to get anomaly threshold. _q(float): The parameter used to calculate the quantile which range is [0, 100]. _threshold(float|None): The threshold to judge anomaly. _anomaly_score_fn(Callable[..., List[float]]|None): The method to get anomaly score. _pred_adjust(bool): Whether to adjust the pred label according to the real label. _pred_adjust_fn(Callable[..., np.ndarray]|None): The method to adjust pred label. _optimizer_params(Dict[str, Any]): Optimizer parameters. _eval_metrics(List[str]): Evaluation metrics of model. _callbacks(List[Callback]): Customized callback functions. _batch_size(int): Number of samples per batch. _max_epochs(int): Max epochs during training. _verbose(int): Verbosity mode. _patience(int): Number of epochs to wait for improvement before terminating. _seed(int|None): Global random seed. _stop_training(bool): Training status. _ed_type(str): The type of encoder and decoder. _activation(Callable[..., paddle.Tensor]): The activation function for the hidden layers. _last_layer_activation(Callable[..., paddle.Tensor]): The activation function for the last layer. _hidden_config(List[int]|None): The ith element represents the number of neurons in the ith hidden layer. _kernel_size(int): Kernel size for Conv1D. _dropout_rate(float): Dropout regularization parameter. _use_bn(bool): Whether to use batch normalization. _embedding_size(int): The size of each embedding vector. _pooling(bool): Whether to use average pooling to aggregate embeddings, if False, concat each embedding. """ def __init__( self, in_chunk_len: int, sampling_stride: int = 1, loss_fn: Callable[..., paddle.Tensor] = F.mse_loss, optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam, threshold_fn: Callable[..., float] = U.percentile, q: float = 100, threshold: Optional[float] = None, threshold_coeff: float = 1.0, anomaly_score_fn: Callable[..., List[float]] = None, pred_adjust: bool = False, pred_adjust_fn: Callable[..., np.ndarray] = U.result_adjust, optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3), eval_metrics: List[str] = [], callbacks: List[Callback] = [], batch_size: int = 32, max_epochs: int = 100, verbose: int = 1, patience: int = 10, seed: Optional[int] = None, ed_type: str = 'MLP', activation: Callable[..., paddle.Tensor] = paddle.nn.ReLU, last_layer_activation: Callable[..., paddle.Tensor] = paddle.nn.Identity, use_bn: bool = False, hidden_config: List[int] = None, kernel_size: int = 3, dropout_rate: float = 0.2, embedding_size: int = 16, pooling: bool = False, ): self._hidden_config = ( hidden_config if hidden_config else [32, 16] ) self._use_bn = use_bn self._kernel_size = kernel_size self._ed_type = ed_type self._activation = activation self._last_layer_activation = last_layer_activation self._dropout_rate = dropout_rate self._embedding_size = embedding_size self._pooling = pooling self._q = q super(AutoEncoder, self).__init__( in_chunk_len=in_chunk_len, sampling_stride=sampling_stride, loss_fn=loss_fn, optimizer_fn=optimizer_fn, threshold=threshold, threshold_coeff=threshold_coeff, threshold_fn=threshold_fn, anomaly_score_fn=anomaly_score_fn, pred_adjust=pred_adjust, pred_adjust_fn=pred_adjust_fn, optimizer_params=optimizer_params, eval_metrics=eval_metrics, callbacks=callbacks, batch_size=batch_size, max_epochs=max_epochs, verbose=verbose, patience=patience, seed=seed, ) def _update_fit_params( self, train_tsdataset: TSDataset, valid_tsdataset: Optional[TSDataset] = None ) -> Dict[str, Any]: """Infer parameters by TSdataset automatically. Args: train_tsdataset(TSDataset): train dataset. valid_tsdataset(TSDataset|None): validation dataset. Returns: Dict[str, Any]: model parameters. """ train_df = train_tsdataset.to_dataframe() observed_cat_cols = collections.OrderedDict() observed_num_cols = [] observed_train_tsdataset = train_tsdataset.get_observed_cov() observed_dtypes = dict(observed_train_tsdataset.dtypes) for col in observed_train_tsdataset.columns: if np.issubdtype(observed_dtypes[col], np.integer): observed_cat_cols[col] = len(train_df[col].unique()) else: observed_num_cols.append(col) fit_params = { "observed_cat_cols": observed_cat_cols, "observed_num_dim": len(observed_num_cols), "observed_cat_dim": len(observed_cat_cols), } return fit_params def _init_network(self) -> paddle.nn.Layer: """Setup the network. Returns: paddle.nn.Layer. """ return _AEBlock( self._in_chunk_len, self._ed_type, self._fit_params, self._hidden_config, self._activation, self._last_layer_activation, self._kernel_size, self._dropout_rate, self._use_bn, self._embedding_size, self._pooling ) def _get_threshold( self, anomaly_score: np.ndarray ) -> float: """Get the threshold value to judge anomaly. Args: anomaly_score(np.ndarray): Returns: float: Thresold value. """ return self._threshold_fn(anomaly_score, self._q)