Source code for paddlets.transform.base

# !/usr/bin/env python3
# -*- coding:utf-8 -*-

import abc
import copy
from typing import Callable, List, Optional, Union
from paddlets.utils.utils import get_tsdataset_max_len, split_dataset

import numpy as np
import pandas as pd

from paddlets import TSDataset, TimeSeries
from paddlets.logger import raise_if_not, raise_if, raise_log
from paddlets.logger.logger import log_decorator

TSDATASET_COL_TYPES = ['target', 'observed_cov', 'known_cov']

[docs]class BaseTransform(object, metaclass=abc.ABCMeta): """ Base class for all data transformation classes (named `transformers` in this module) Any subclass or transformer needs to inherit from this base class and implement :func:`fit`, :func:`transform` and :func:`fit_transform` methods. """ def __init__(self): #if transformer need previous data to generate features self.need_previous_data = False self.n_rows_pre_data_need = 0 def _check_multi_tsdataset(self, datasets: List[TSDataset]): """ Check the validity of multi time series combination transform Args: datasets(List[TSDataset]): datasets from which to fit or transform the transformer. """ raise_if( len(datasets) == 0, "The Length of datasets cannot be 0!" ) columns_set = set(tuple(sorted(dataset.columns.items())) for dataset in datasets) raise_if_not( len(columns_set) == 1, f"Invalid tsdatasets. The given tsdataset column schema ({[ts.columns for ts in datasets]}) must be same." )
[docs] def fit(self, dataset: Union[TSDataset, List[TSDataset]]): """ Learn the parameters from the dataset needed by the transformer. Any non-abstract class inherited from this class should implement this method. The parameters fitted by this method is transformer-specific. For example, the `MinMaxScaler` needs to compute the MIN and MAX, and the `StandardScaler` needs to compute the MEAN and STD (standard deviation) from the dataset. Args: dataset(Union[TSDataset, List[TSDataset]]): dataset from which to fit the transformer. """ if isinstance(dataset, list): self._check_multi_tsdataset(dataset) attr_list = ['target', 'observed_cov', 'known_cov'] ts_build_param = {} for attr in attr_list: if getattr(dataset[0], attr) is not None: ts_build_param[attr] = TimeSeries( pd.concat([getattr(data, attr).data for data in dataset]).reset_index(drop=True), 1 ) else: ts_build_param[attr] = None #new_dataset is not a standard TSDataset, only use fit new_dataset = TSDataset(**ts_build_param) return self.fit_one(new_dataset) else: return self.fit_one(dataset)
[docs] @abc.abstractmethod def fit_one(self, dataset: TSDataset): """ Learn the parameters from the dataset needed by the transformer. Any non-abstract class inherited from this class should implement this method. The parameters fitted by this method is transformer-specific. For example, the `MinMaxScaler` needs to compute the MIN and MAX, and the `StandardScaler` needs to compute the MEAN and STD (standard deviation) from the dataset. Args: dataset(TSDataset): dataset from which to fit the transformer. """ pass
[docs] def transform( self, dataset: Union[TSDataset, List[TSDataset]], inplace: bool = False ) -> Union[TSDataset, List[TSDataset]]: """ Apply the fitted transformer on the dataset Any non-abstract class inherited from this class should implement this method. Args: dataset(Union[TSDataset, List[TSDataset]): dataset to be transformed. inplace(bool, optional): Set to True to perform inplace transformation. Default is False. Returns: Union[TSDataset, List[TSDataset]]: transformed dataset. """ if isinstance(dataset, list): self._check_multi_tsdataset(dataset) return [self.transform_one(data, inplace) for data in dataset] else: return self.transform_one(dataset, inplace)
[docs] @abc.abstractmethod def transform_one( self, dataset: TSDataset, inplace: bool = False ) -> TSDataset: """ Apply the fitted transformer on the dataset Any non-abstract class inherited from this class should implement this method. Args: dataset(TSDataset): dataset to be transformed. inplace(bool, optional): Set to True to perform inplace transformation. Default is False. Returns: TSDataset: transformed dataset. """ pass
[docs] def transform_n_rows( self, dataset: TSDataset, n_rows:int, inplace: bool = False, ) -> TSDataset: """ Apply the fitted transformer on the part of the dataset Args: dataset(TSDataset): dataset to be transformed. n_rows(int):n_rows to be transformed. inplace(bool, optional): Set to True to perform inplace transformation. Default is False. Returns: TSDataset: transformed dataset. """ data_len = get_tsdataset_max_len(dataset) if not self.need_previous_data or data_len == n_rows: return self.transform_one(dataset, inplace) if self.n_rows_pre_data_need == -1: transformed_dataset = self.transform_one(dataset, inplace) _, res = split_dataset(transformed_dataset , data_len - n_rows) else: _, dataset = split_dataset(dataset, data_len - n_rows - self.n_rows_pre_data_need) transformed_dataset = self.transform_one(dataset, inplace) _, res = split_dataset(transformed_dataset, self.n_rows_pre_data_need) return res
[docs] def fit_transform( self, dataset: Union[TSDataset, List[TSDataset]], inplace: bool = False ) -> Union[TSDataset, List[TSDataset]]: """ Combine the above fit and transform into one method, firstly fitting the transformer from the dataset and then applying the fitted transformer on the dataset. Any non-abstract class inherited from this class should implement this method. Args: dataset(Union[TSDataset, List[TSDataset]]): dataset to process. inplace(bool, optional): Set to True to perform inplace transformation. Default is False. Returns: Union[TSDataset, List[TSDataset]]: transformed data. """ return self.fit(dataset).transform(dataset, inplace)
[docs] def inverse_transform( self, dataset: Union[TSDataset, List[TSDataset]], inplace: bool = False ) -> Union[TSDataset, List[TSDataset]]: """ Inversely transform the dataset output by the `transform` method. Differ from other abstract methods, this method is not decorated by abc.abstractmethod. The reason is that not all the transformations can be transformed back inversely, thus, it is neither possible nor mandatory for all sub classes inherited from this base class to implement this method. In general, other modules such as Pipeline will possibly call this method WITHOUT knowing if the called transform instance has implemented this method. To work around this, instead of simply using `pass` expression as the default placeholder, this method raises a NotImplementedError to enable the callers (e.g. Pipeline) to use try-except mechanism to identify those data transformation operators that do NOT implement this method. Args: dataset(Union[TSDataset, List[TSDataset]]): dataset to be inversely transformed. inplace(bool, optional): Set to True to perform inplace transformation. Default is False. Returns: TSDataset: inverserly transformed dataset. Raises: NotImplementedError """ if isinstance(dataset, list): self._check_multi_tsdataset(dataset) return [self.inverse_transform_one(data, inplace) for data in dataset] else: return self.inverse_transform_one(dataset, inplace)
[docs] def inverse_transform_one( self, dataset: TSDataset, inplace: bool = False ) -> TSDataset: """ Inversely transform the dataset output by the `transform` method. Differ from other abstract methods, this method is not decorated by abc.abstractmethod. The reason is that not all the transformations can be transformed back inversely, thus, it is neither possible nor mandatory for all sub classes inherited from this base class to implement this method. In general, other modules such as Pipeline will possibly call this method WITHOUT knowing if the called transform instance has implemented this method. To work around this, instead of simply using `pass` expression as the default placeholder, this method raises a NotImplementedError to enable the callers (e.g. Pipeline) to use try-except mechanism to identify those data transformation operators that do NOT implement this method. Args: dataset(TSDataset): dataset to be inversely transformed. inplace(bool, optional): Set to True to perform inplace transformation. Default is False. Returns: TSDataset: inverserly transformed dataset. Raises: NotImplementedError """ raise NotImplementedError
[docs]class UdBaseTransform(BaseTransform): """ User define base transform. Args: ud_transformer(object): User define or third-party transformer object. in_col_names(Optional[Union[str, List[str]]]): Column name or names to be transformed. per_col_transform(bool): Whether each column of data is transformed independently, default False. drop_origin_columns(bool): Whether to delete the original column, default=False. out_col_types(Optional[Union[str, List[str]]]): The type of output columns, None values represent automatic inference based on input. out_col_names(Optional[List[str]]): The name of output columns, None values represent automatic inference based on input. """ def __init__( self, ud_transformer: object, in_col_names: Optional[Union[str, List[str]]]=None, per_col_transform: bool=False, drop_origin_columns: bool=False, out_col_types: Optional[Union[str, List[str]]]=None, out_col_names: Optional[List[str]]=None, ): super().__init__() self._ud_transformer = ud_transformer self._drop_origin_columns = drop_origin_columns self._out_col_types = out_col_types self._out_col_names = out_col_names self._cols = [in_col_names] if isinstance(in_col_names, str) else in_col_names self._fitted = False self._per_col_transform = per_col_transform if self._per_col_transform: self._ud_transformer_col_list = {} def _check_output( self, raw_dataset: TSDataset, input: pd.DataFrame, output: np.ndarray ): """ Check the legitimacy of the output. Args: raw_dataset(TSDataset): dataset to be transformed. input(pd.DataFrame): The input of ud transformer base on the raw_dataset. output(np.ndarray): The output of ud transformer. Returns: None Raises: ValueError """ raise_if_not( input.shape[0] == output.shape[0], "The row of input is not equal to the row of output!" ) output_col_num = output.shape[1] if len(output.shape) >= 2 else output.shape[0] if self._out_col_names: if isinstance(self._out_col_names, list): raise_if_not( len(self._out_col_names) == output_col_num, "The out_col_names does not match the actual output!" ) def check_start_time(): start_time_set = set(raw_dataset.get_item_from_column(column).start_time for column in input.columns) return len(start_time_set) == 1 def get_input_col_type(): return set(raw_dataset.columns[column] for column in input.columns) if self._out_col_types: raise_if_not( check_start_time(), "The start time point of input cols is different!" ) if isinstance(self._out_col_types, list): raise_if_not( len(self._out_col_types) == output_col_num, "The out_col_types does not match the actual output" ) for type in self._out_col_types: raise_if( type not in TSDATASET_COL_TYPES, f"Invalid col type: {type}" ) else: if len(get_input_col_type()) != 1: raise_if_not( input.shape == output.shape and \ check_start_time() and \ self._drop_origin_columns, "The type of input column must be the same in this case!" ) def _infer_output_column_types( self, raw_dataset: TSDataset, input: pd.DataFrame, )-> Union[str, List[str]]: """ Infer output column's types. Args: raw_dataset(TSDataset): dataset to be transformed. input(pd.DataFrame): The input of ud transformer base on the raw_dataset. output(np.ndarray): The output of ud transformer. Returns: out_col_types(Union[str, List[str]]) """ if self._out_col_types: return self._out_col_types else: columns = list(raw_dataset.columns[column] for column in input.columns) if len(set(columns)) == 1: return columns[0] else: return columns def _get_output_column_names( self, input: pd.DataFrame, output: np.ndarray )-> List[str]: """ Get output column's names. Args: input(pd.DataFrame): The input of ud transformer base on the raw_dataset. output(np.ndarray): The output of ud transformer. Returns: out_col_names(List[str]) """ if self._out_col_names: return self._out_col_names else: if input.shape == output.shape and self._drop_origin_columns: return list(input.columns) else: name_prefix = "_".join( [ self._ud_transformer.__class__.__name__, "-".join( [column for column in input.columns] ) ] ) output_col_num = output.shape[1] if len(output.shape) >= 2 else output.shape[0] return [f"{name_prefix}_{i}" for i in range(output_col_num)] def _gen_input( self, raw_dataset: TSDataset, cols: Union[str, List[str]], strict: bool=True )->pd.DataFrame: """ Generate the input to ud transformer base on raw_dataset. Args: raw_dataset(TSDataset): dataset to be transformed. cols(Union[str, List[str]]): The input col names. strict(bool): Strict matching or not. Returns: input(pd.DataFrame) """ cols = cols if isinstance(cols, list) else [cols] if strict: input = raw_dataset[cols] else: cols = [col for col in cols if col in raw_dataset.columns] raise_if( len(cols) == 0, "No cols was matched!" ) input = raw_dataset[cols] if isinstance(input, pd.Series): input = input.to_frame() return input def _gen_output( self, raw_output )->np.ndarray: """ Generate the np.ndarray output base on the raw_output from ud transform. Args: raw_output(TSDataset): raw_output from ud transform. Returns: output(np.ndarray) """ if isinstance(raw_output, np.ndarray): return raw_output else: raise_log( TypeError(f"Invalid output type: {type(raw_output)}") )
[docs] @log_decorator def fit_one(self, dataset: TSDataset): """ Learn the parameters from the dataset needed by the transformer. Args: dataset(TSDataset): dataset from which to fit the transformer Returns: self """ if self._cols is None: self._cols = list(dataset.columns.keys()) if self._per_col_transform: tmp_tansformer = self._ud_transformer for col in self._cols: self._ud_transformer = copy.deepcopy(tmp_tansformer) input = self._gen_input(dataset, col) self._fit(input) self._ud_transformer_col_list[col] = self._ud_transformer else: input = self._gen_input(dataset, self._cols) self._fit(input) self._fitted = True return self
def _transform_logic( self, dataset: TSDataset, cols: Union[str, List[str]], transform_func: Callable ) -> TSDataset: """ Transform or inverse_transform the dataset with the fitted transformer. Args: dataset(TSDataset): dataset to be transformed. cols(Union[str, List[str]]): The input col names. transform_func(Callable): The transform function. Returns: TSDataset """ cols = cols if isinstance(cols, list) else [cols] input = self._gen_input(dataset, cols) raw_output = transform_func(input) output = self._gen_output(raw_output) self._check_output(dataset, input, output) col_names = self._get_output_column_names(input, output) col_types = self._infer_output_column_types(dataset, input) insert_col_name = [] def set_columns(output, col_types): for col in output.columns: dataset.set_column(col, output[col], col_types) insert_col_name.append(col) def gen_index(): start_time = dataset.get_item_from_column(cols[0]).start_time if isinstance(dataset.freq, str): return pd.date_range( start=start_time, periods=output.shape[0], freq=dataset.freq ) else: return pd.RangeIndex( start=start_time, stop=start_time + output.shape[0] * dataset.freq, step=dataset.freq ) time_index = gen_index() if isinstance(col_types, str): tmp_output = pd.DataFrame(output, index=time_index, columns=col_names) set_columns(tmp_output, col_types) else: for i in range(output.shape[1]): tmp_output = pd.DataFrame(output[:, i], index=time_index, columns=[col_names[i]]) set_columns(tmp_output, col_types[i]) if self._drop_origin_columns: for col in input.columns: if col not in insert_col_name: dataset.drop(col) return dataset
[docs] @log_decorator def transform_one( self, dataset: TSDataset, inplace: bool = False ) -> TSDataset: """ Transform or inverse_transform the dataset with the fitted transformer. Args: dataset(TSDataset): dataset to be transformed. inplace(bool): whether to replace the original data. default=False Returns: TSDataset """ new_ts = dataset if inplace else dataset.copy() if self._per_col_transform: for col in self._cols: if col not in dataset.columns: continue self._ud_transformer = self._ud_transformer_col_list[col] self._transform_logic(new_ts, col, self._transform) return new_ts else: return self._transform_logic(new_ts, self._cols, self._transform)
[docs] @log_decorator def inverse_transform_one( self, dataset: TSDataset, inplace: bool=False ) -> TSDataset: """ Inversely transform the dataset output by the `transform` method. Args: dataset(TSDataset): dataset to be inversely transformed. inplace(bool): Set to True to perform inplace operation and avoid data copy. Returns: TSDataset: Inversely transformed TSDataset. """ new_ts = dataset if inplace else dataset.copy() if self._per_col_transform: for col in self._cols: if col not in dataset.columns: continue self._ud_transformer = self._ud_transformer_col_list[col] self._transform_logic(new_ts, col, self._inverse_transform) return new_ts else: return self._transform_logic(dataset, self._cols, self._inverse_transform)
@abc.abstractmethod def _fit(self, input: pd.DataFrame): """ Learn the parameters from the dataset needed by the transformer. Args: input(pd.DataFrame): The input to transformer. Returns: None """ pass @abc.abstractmethod def _transform( self, input: pd.DataFrame ): """ Transform the dataset with the fitted transformer. Args: input(pd.DataFrame): The input to transformer. """ pass def _inverse_transform( self, input: pd.DataFrame ): """ Inversely transform the dataset output by the `transform` method. Args: input(pd.DataFrame): The input to transformer. """ raise NotImplementedError