Source code for paddlets.models.anomaly.dl._anomaly_transformer.attention

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from typing import List, Dict, Any, Callable, Optional

from math import sqrt
import paddle.nn.functional as F
import numpy as np
import paddle
import math


[docs]class TriangularCausalMask(): """ Triangular Causal Mask. Args: batch_size(int): Number of samples per batch. length(int): Length of samples per data. Attributes: _nn(paddle.nn.Sequential): Dynamic graph LayerList. """ def __init__( self, batch_size: int, length: int ): mask_shape = [batch_size, 1, length, length] with paddle.no_grad(): self._mask = paddle.triu(paddle.ones(shape=mask_shape, dtype="bool"), diagonal=1) @property def mask( self ): return self._mask
[docs]class AnomalyAttention(paddle.nn.Layer): """ Anomaly Attention: For the prior-association, a learnable Gaussian kernel to calculate the prior with the relative temporal distance. For the series-association branch is to learn the associations from raw series. Args: win_size(int): The size of the loopback window, i.e. the number of time steps feed to the model. mask_flag(bool): Whether to use attn_mask. scale(int|None): It can scale the dot products. attention_dropout(float): Dropout regularization parameter. output_attention(bool): Whether to output series, prior and sigma. Attributes: _nn(paddle.nn.Sequential): Dynamic graph LayerList. """ def __init__( self, win_size: int, mask_flag: bool = True, scale: bool = None, attention_dropout: float = 0.0, output_attention: bool = False, ): super(AnomalyAttention, self).__init__() self.scale = scale self.mask_flag = mask_flag self.output_attention = output_attention self.dropout = paddle.nn.Dropout(attention_dropout) self.distances = paddle.zeros((win_size, win_size)) for i in range(win_size): for j in range(win_size): self.distances[i][j] = abs(i - j)
[docs] def forward( self, queries: paddle.Tensor, keys: paddle.Tensor, values: paddle.Tensor, sigma: paddle.Tensor, attn_mask: Callable[..., paddle.Tensor], ) -> paddle.Tensor: """ The prior-association result from Gaussian kernel branch. the series-association result from self attention branch. Args: queries(paddle.Tensor): The query projection layer. keys(paddle.Tensor): The key projection layer. values(paddle.Tensor): The value projection layer. sigma(paddle.Tensor): A learnable scale parameter for the Gaussian kernel, making ther prior-associations adapt the various time series patterns. attn_mask(Callable[..., paddle.Tensor]|None): Whether to use mask in ecoder. Returns: V(paddle.Tensor): Output of AnomalyAttention. series(paddle.Tensor): The series-association from Gaussian kernel branch. prior(paddle.Tensor): The prior-association from self attention. sigma(paddle.Tensor): A learnable scale parameter for the Gaussian kernel. """ B, L, H, E = queries.shape _, S, _, D = values.shape scale = self.scale or 1. / sqrt(E) scores = paddle.einsum("blhe,bshe->bhls", queries, keys) if self.mask_flag: if attn_mask is None: attn_mask = TriangularCausalMask(B, L) scores.masked_fill_(attn_mask.mask, -np.inf) attn = scale * scores series = self.dropout(F.softmax(attn, axis=-1)) V = paddle.einsum("bhls,bshd->blhd", series, values) window_size = attn.shape[-1] sigma = paddle.transpose(sigma, perm=[0, 2, 1]) # B L H -> B H L sigma = paddle.nn.functional.sigmoid(sigma * 5) + 1e-5 sigma = paddle.pow(paddle.to_tensor(3.), sigma) # - 1 sigma = paddle.tile(sigma.unsqueeze(-1), repeat_times=[1, 1, 1, window_size]) # B H L L prior = paddle.tile(self.distances.unsqueeze(0).unsqueeze(0), [sigma.shape[0], sigma.shape[1], 1, 1]) prior = 1.0 / (math.sqrt(2 * math.pi) * sigma) * paddle.exp(-prior ** 2 / 2 / (sigma ** 2)) if self.output_attention: return (V, series, prior, sigma) else: return (V, None)
[docs]class AttentionLayer(paddle.nn.Layer): """ AttentionLayer for anomaly transformer. Args: attention(Callable[..., paddle.Tensor]): Attention layers in anomaly transformer. d_model(int): The expected feature size for the input of the anomaly transformer. n_heads(int): The number of heads in multi-head attention. d_keys(int): The feature size in key. d_values(int): The feature size in value. Attributes: _nn(paddle.nn.Sequential): Dynamic graph LayerList. """ def __init__( self, attention: Callable[..., paddle.Tensor], d_model: int, n_heads: int, d_keys: int = None, d_values: int = None ): super(AttentionLayer, self).__init__() d_keys = d_keys or (d_model // n_heads) d_values = d_values or (d_model // n_heads) self.norm = paddle.nn.LayerNorm(d_model) self.inner_attention = attention self.query_projection = paddle.nn.Linear(d_model, d_keys * n_heads) self.key_projection = paddle.nn.Linear(d_model, d_keys * n_heads) self.value_projection = paddle.nn.Linear(d_model, d_values * n_heads) self.sigma_projection = paddle.nn.Linear(d_model, n_heads) self.out_projection = paddle.nn.Linear(d_values * n_heads, d_model) self.n_heads = n_heads
[docs] def forward( self, queries: paddle.Tensor, keys: paddle.Tensor, values: paddle.Tensor, attn_mask: Callable[..., paddle.Tensor], )-> paddle.Tensor: """ The series-association and the prior-association forward. Args: queries(paddle.Tensor): The query projection layer tensor. keys(paddle.Tensor): The key projection layer tensor. values(paddle.Tensor): The value projection layer tensor. sigma(paddle.Tensor): A learnable scale parameter for the Gaussian kernel. attn_mask(Callable[..., paddle.Tensor]): Whether to use mask in ecoder. Returns: self.out_projection(out)(paddle.Tensor): pred of model. series(paddle.Tensor): The series-association output tensor. prior(paddle.Tensor): The prior-association output tensor. sigma(paddle.Tensor): A learnable scale parameter for the Gaussian kernel. """ B, L, _ = queries.shape _, S, _ = keys.shape H = self.n_heads x = queries queries = paddle.reshape(self.query_projection(queries), [B, L, H, -1]) keys = paddle.reshape(self.key_projection(keys), [B, S, H, -1]) values = paddle.reshape(self.value_projection(values), [B, S, H, -1]) sigma = paddle.reshape(self.sigma_projection(x), [B, L, H]) out, series, prior, sigma = self.inner_attention(queries, keys, values, sigma, attn_mask) out = paddle.reshape(out, [B, L, -1]) return self.out_projection(out), series, prior, sigma