Source code for paddlets.models.forecasting.ml.ml_model_wrapper

# !/usr/bin/env python3
# -*- coding:utf-8 -*-

import abc
import numpy as np
import sklearn
from typing import Type, Dict, Any, Optional, Callable, Tuple

from paddlets.models.base import BaseModel
from paddlets.models.forecasting.ml.ml_base import MLBaseModel
from paddlets.models.forecasting.ml.adapter.data_adapter import DataAdapter
from paddlets.models.forecasting.ml.adapter.ml_dataset import MLDataset
from paddlets.models.forecasting.ml.adapter.ml_dataloader import MLDataLoader
from paddlets.models.utils import to_tsdataset
from paddlets.datasets import TSDataset
from paddlets.logger import raise_log, raise_if, raise_if_not


[docs]class MLModelBaseWrapper(MLBaseModel, metaclass=abc.ABCMeta): """ Time series model base wrapper for third party models. Args: model_class(Type): Class type of the third party model. in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. skip_chunk_len(int, optional): The number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. sampling_stride(int, optional): Time steps to stride over the i-th sample and (i+1)-th sample. More precisely, let `t` be the time index of target time series, `t[i]` be the start time of the i-th sample, `t[i+1]` be the start time of the (i+1)-th sample, then `sampling_stride` represents the result of `t[i+1] - t[i]`. model_init_params(Dict[str, Any]): All params for initializing the third party model. fit_params(Dict[str, Any], optional): All params for fitting third party model except x_train / y_train. predict_params(Dict[str, Any], optional): All params for forecasting third party model except x_test / y_test. """ def __init__( self, model_class: Type, in_chunk_len: int, out_chunk_len: int, skip_chunk_len: int = 0, sampling_stride: int = 1, model_init_params: Dict[str, Any] = None, fit_params: Dict[str, Any] = None, predict_params: Dict[str, Any] = None ): super(MLModelBaseWrapper, self).__init__( in_chunk_len=in_chunk_len, skip_chunk_len=skip_chunk_len, out_chunk_len=out_chunk_len ) self._sampling_stride = sampling_stride self._model_class = model_class self._model_init_params = model_init_params if model_init_params is not None else dict() self._fit_params = fit_params if fit_params is not None else dict() self._predict_params = predict_params if predict_params is not None else dict()
[docs]class SklearnModelWrapper(MLModelBaseWrapper): """ Time series model wrapper for sklearn third party models. Args: model_class(Type): Class type of the third party model. in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. skip_chunk_len(int, optional): The number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. sampling_stride(int, optional): Time steps to stride over the i-th sample and (i+1)-th sample. More precisely, let `t` be the time index of target time series, `t[i]` be the start time of the i-th sample, `t[i+1]` be the start time of the (i+1)-th sample, then `sampling_stride` represents the result of `t[i+1] - t[i]`. model_init_params(Dict[str, Any]): All params for initializing the third party model. fit_params(Dict[str, Any], optional): All params for fitting third party model except x_train / y_train. predict_params(Dict[str, Any], optional): All params for forecasting third party model except x_test / y_test. udf_ml_dataloader_to_fit_ndarray(Callable, optional): User defined function for converting MLDataLoader object to a numpy.ndarray object that can be processed by `fit` method of the third party model. udf_ml_dataloader_to_predict_ndarray(Callable, optional): User defined function for converting MLDataLoader object to a numpy.ndarray object that can be processed by `predict` method of the third party model. """ def __init__( self, model_class: Type, in_chunk_len: int, out_chunk_len: int, skip_chunk_len: int = 0, sampling_stride: int = 1, model_init_params: Dict[str, Any] = None, fit_params: Dict[str, Any] = None, predict_params: Dict[str, Any] = None, udf_ml_dataloader_to_fit_ndarray: Optional[Callable] = None, udf_ml_dataloader_to_predict_ndarray: Optional[Callable] = None ): raise_if(in_chunk_len < 0, f"in_chunk_len ({in_chunk_len}) must >= 0.") raise_if(skip_chunk_len < 0, f"skip_chunk_len ({skip_chunk_len}) must >= 0.") raise_if( out_chunk_len != 1, f"""out_chunk_len ({out_chunk_len}) must == 1. Please refer to {BaseModel}.recursive_predict for multistep time series forecasting.""" ) raise_if(sampling_stride < 1, f"sampling_stride ({sampling_stride}) must >= 1.") super(SklearnModelWrapper, self).__init__( in_chunk_len=in_chunk_len, skip_chunk_len=skip_chunk_len, out_chunk_len=out_chunk_len, model_class=model_class, sampling_stride=sampling_stride, model_init_params=model_init_params, fit_params=fit_params, predict_params=predict_params ) raise_if(self._model_class is None, "model_class must not be None.") raise_if_not(isinstance(self._model_class, type), "isinstance(model_class, type) must be True.)") self._model = self._init_model() self._data_adapter = self._init_data_adapter() self._udf_ml_dataloader_to_fit_ndarray = default_ml_dataloader_to_fit_ndarray if udf_ml_dataloader_to_fit_ndarray is not None: self._udf_ml_dataloader_to_fit_ndarray = udf_ml_dataloader_to_fit_ndarray self._udf_ml_dataloader_to_predict_ndarray = default_ml_dataloader_to_predict_ndarray if udf_ml_dataloader_to_predict_ndarray is not None: self._udf_ml_dataloader_to_predict_ndarray = udf_ml_dataloader_to_predict_ndarray
[docs] def fit(self, train_data: TSDataset, valid_data: Optional[TSDataset] = None) -> None: """ Fit a machine learning model. Args: train_data(TSDataset): training dataset. valid_data(TSDataset, optional): validation dataset. """ self._validate_train_data(train_data) train_ml_dataset = self._tsdataset_to_ml_dataset(train_data) train_ml_dataloader = self._ml_dataset_to_ml_dataloader(train_ml_dataset) train_x, train_y = None, None try: train_x, train_y = self._udf_ml_dataloader_to_fit_ndarray( ml_dataloader=train_ml_dataloader, model_init_params=self._model_init_params, in_chunk_len=self._in_chunk_len, out_chunk_len=self._out_chunk_len, skip_chunk_len=self._skip_chunk_len ) except Exception as e: raise_log( ValueError( f"""failed to convert train_data to sklearn model trainable numpy array. Please check udf_ml_dataloader_to_fit_ndarray function. Error: {str(e)}""" ) ) if hasattr(self._model, "fit") and callable(getattr(self._model, "fit")): self._model.fit(train_x, train_y, **self._fit_params) return raise_log(ValueError(f"{self._model_class} must implement callable fit method."))
[docs] @to_tsdataset def predict(self, tsdataset: TSDataset) -> TSDataset: """ Make prediction. Args: tsdataset(TSDataset): TSDataset to predict. Returns: TSDataset: TSDataset with predictions. """ self._validate_predict_data(tsdataset) w0 = len(tsdataset.get_target().time_index) - 1 + self._skip_chunk_len + self._out_chunk_len w1 = w0 time_window = (w0, w1) test_ml_dataset = self._tsdataset_to_ml_dataset(tsdataset, time_window=time_window) test_ml_dataloader = self._data_adapter.to_ml_dataloader(test_ml_dataset, batch_size=len(test_ml_dataset)) test_x, test_y = None, None try: test_x, test_y = self._udf_ml_dataloader_to_predict_ndarray( ml_dataloader=test_ml_dataloader, model_init_params=self._model_init_params, in_chunk_len=self._in_chunk_len, out_chunk_len=self._out_chunk_len, skip_chunk_len=self._skip_chunk_len ) except Exception as e: raise_log( ValueError( f"""failed to convert train_data to sklearn model predictable numpy array. Please check udf_ml_dataloader_to_predict_ndarray function. Error: {str(e)}""" ) ) if hasattr(self._model, "predict") and callable(getattr(self._model, "predict")): return self._model.predict(test_x, **self._predict_params) raise_log(ValueError(f"original model {self._model_class} must have callable predict method."))
def _init_model(self) -> sklearn.base.BaseEstimator: """ Internal method, init sklearn model. 1) The model class must be inherited from sklearn.base.BaseEstimator. 2) The initialized model object (or its ancestor) must implement fit and predict callable method. """ # 1 model must be inherited from sklearn.base.BaseEstimator. expected_parent_class = sklearn.base.BaseEstimator parent_classes = self._model_class.mro() raise_if_not( expected_parent_class in parent_classes, f"{self._model_class} must inherit from {expected_parent_class}, but actual inherit chain: {parent_classes}" ) model = None try: model = self._model_class(**self._model_init_params) except Exception as e: # all other possible errors are captured here: # (1) TypeError: __init__() got an unexpected keyword argument "xxx" raise_log( ValueError( f"init model failed: {self._model_class}, model_init_params: {self._model_init_params}, error: {e}." ) ) # It is possible that the class has "predict" method, but the initialized model object does NOT have one. # Given this scenario, we need to check the initialized object rather than the class type. # For example: # >>> from sklearn.neighbors import LocalOutlierFactor # >>> model = LocalOutlierFactor() # >>> hasattr(model, "predict") # False # >>> hasattr(model.__class__, "predict") # True not_implemented_method_set = set() for method in {"fit", "predict"}: if (hasattr(model, method) and callable(getattr(model, method))) is not True: not_implemented_method_set.add(method) raise_if( len(not_implemented_method_set) > 0, f"The initialized {self._model_class} object must implement {not_implemented_method_set} methods." ) return model def _init_data_adapter(self) -> DataAdapter: """ Internal method, initialize data adapter. Returns: DataAdapter: Initialized data adapter object. """ return DataAdapter() def _tsdataset_to_ml_dataset( self, tsdataset: TSDataset, time_window: Optional[Tuple[int, int]] = None ) -> MLDataset: """ Internal method, convert TSDataset to MLDataset. Returns: MLDataset: Converted MLDataset object. """ return self._data_adapter.to_ml_dataset( rawdataset=tsdataset, in_chunk_len=self._in_chunk_len, out_chunk_len=self._out_chunk_len, skip_chunk_len=self._skip_chunk_len, sampling_stride=self._sampling_stride, time_window=time_window ) def _ml_dataset_to_ml_dataloader(self, ml_dataset: MLDataset) -> MLDataLoader: """ Internal method, convert MLDataset to MLDataLoader. Returns: MLDataLoader: Converted MLDataLoader object. """ return self._data_adapter.to_ml_dataloader(ml_dataset, batch_size=len(ml_dataset)) def _validate_train_data(self, train_data: TSDataset) -> None: """ Internal method, validate training data. Raises if invalid. Args: train_data(TSDataset): Training data to be validated. """ raise_if(train_data is None, "training dataset must not be None.") raise_if(train_data.get_target() is None, "target timeseries must not be None.") target_ts = train_data.get_target() # multi target is NOT supported. raise_if( len(target_ts.columns) > 1, f"training dataset target timeseries columns number ({len(target_ts.columns)}) must be 1." ) def _validate_predict_data(self, test_data: TSDataset) -> None: """ Internal method, validate data for prediction. Raises if invalid. Args: test_data(TSDataset): Predict data to be validated. """ raise_if(test_data is None, "The dataset to be predicted must not be None.") raise_if(test_data.get_target() is None, "target timeseries to be predicted must not be None.") target_ts = test_data.get_target() # multi target is NOT supported. raise_if( len(target_ts.columns) > 1, f"columns number of target timeseries to be predicted ({len(target_ts.columns)}) must be 1." )
[docs]def default_ml_dataloader_to_fit_ndarray( ml_dataloader: MLDataLoader, model_init_params: Dict[str, Any], in_chunk_len: int, skip_chunk_len: int, out_chunk_len: int ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Default function for converting MLDataLoader to a numpy array that can be used for fitting the sklearn model. Args: ml_dataloader(MLDataLoader): MLDataLoader object to be converted. model_init_params(Dict): parameters when initializing sklearn models, possibly be used while converting. in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. Possibly be used while converting. skip_chunk_len(int, optional): The number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. Possibly be used while converting. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. Possibly be used while converting. Returns: Tuple[np.ndarray, Optional[np.ndarray]]: Converted numpy array. The first and second element in the tuple represent x_train and y_train, respectively. """ data = next(ml_dataloader) observed_cov = data['observed_cov'] observed_cov = observed_cov.reshape(observed_cov.shape[0], observed_cov.shape[1] * observed_cov.shape[2]) known_cov = data['known_cov'] known_cov = known_cov.reshape(known_cov.shape[0], known_cov.shape[1] * known_cov.shape[2]) if in_chunk_len < 1: x_train = np.hstack((known_cov, observed_cov)) else: past_target = data['past_target'] past_target = past_target.reshape(past_target.shape[0], past_target.shape[1] * past_target.shape[2]) x_train = np.hstack((known_cov, observed_cov, past_target)) # data["future_target"].shape = (n_samples, out_chunk_len, target_col_num) # y_train.shape must be (n_samples, ) # the pre-check in the self.__init__ already guarantee that out_chunk_len must be equal to 1. # the pre-check in the self.predict already guarantee that target_col_num must be equal to 1. y_train = np.squeeze(data["future_target"]) return x_train, y_train
[docs]def default_ml_dataloader_to_predict_ndarray( ml_dataloader: MLDataLoader, model_init_params: Dict[str, Any], in_chunk_len: int, skip_chunk_len: int, out_chunk_len: int ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Default function for converting MLDataLoader to a numpy array that can be predicted by the sklearn model. Args: ml_dataloader(MLDataLoader): MLDataLoader object to be converted. model_init_params(Dict): parameters when initializing sklearn models, possibly be used while converting. in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. Possibly be used while converting. skip_chunk_len(int, optional): The number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. Possibly be used while converting. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. Possibly be used while converting. Returns: Tuple[np.ndarray, Optional[np.ndarray]]: Converted numpy array. The first and second element in the tuple represent x and y, respectively, where y is optional. """ data = next(ml_dataloader) observed_cov = data['observed_cov'] observed_cov = observed_cov.reshape(observed_cov.shape[0], observed_cov.shape[1] * observed_cov.shape[2]) known_cov = data['known_cov'] known_cov = known_cov.reshape(known_cov.shape[0], known_cov.shape[1] * known_cov.shape[2]) if in_chunk_len < 1: x_test = np.hstack((known_cov, observed_cov)) else: past_target = data['past_target'] past_target = past_target.reshape(past_target.shape[0], past_target.shape[1] * past_target.shape[2]) x_test = np.hstack((known_cov, observed_cov, past_target)) return x_test, None
[docs]def make_ml_model( model_class: Type, in_chunk_len: int, out_chunk_len: int, skip_chunk_len: int = 0, sampling_stride: int = 1, model_init_params: Dict[str, Any] = None, fit_params: Dict[str, Any] = None, predict_params: Dict[str, Any] = None, udf_ml_dataloader_to_fit_ndarray: Optional[Callable] = None, udf_ml_dataloader_to_predict_ndarray: Optional[Callable] = None ) -> MLModelBaseWrapper: """ Make Wrapped time series model based on the third-party model. Args: model_class(Type): Class type of the third party model. in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. skip_chunk_len(int, optional): The number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. sampling_stride(int, optional): Time steps to stride over the i-th sample and (i+1)-th sample. More precisely, let `t` be the time index of target time series, `t[i]` be the start time of the i-th sample, `t[i+1]` be the start time of the (i+1)-th sample, then `sampling_stride` represents the result of `t[i+1] - t[i]`. model_init_params(Dict[str, Any]): All params for initializing the third party model. fit_params(Dict[str, Any], optional): All params for fitting third party model except x_train / y_train. predict_params(Dict[str, Any], optional): All params for forecasting third party model except x_test / y_test. udf_ml_dataloader_to_fit_ndarray(Callable, optional): User defined function for converting MLDataLoader object to a numpy.ndarray object that can be processed by `fit` method of the third party model. Any third party models that accept numpy array as fit inputs can use this function to build the data for training. udf_ml_dataloader_to_predict_ndarray(Callable, optional): User defined function for converting MLDataLoader object to a numpy.ndarray object that can be processed by `predict` method of the third party model. Any third-party models that accept numpy array as predict inputs can use this function to build the data for prediction. Returns: SklearnModelWrapper: Wrapped time series model wrapper object. """ raise_if(model_class is None, "model_class must not be None.") raise_if_not(isinstance(model_class, type), "isinstance(model_class, type) must be True.") if model_class.__module__.split(".")[0] == sklearn.__name__: return SklearnModelWrapper( in_chunk_len=in_chunk_len, out_chunk_len=out_chunk_len, model_class=model_class, skip_chunk_len=skip_chunk_len, sampling_stride=sampling_stride, model_init_params=model_init_params, fit_params=fit_params, predict_params=predict_params, udf_ml_dataloader_to_fit_ndarray=udf_ml_dataloader_to_fit_ndarray, udf_ml_dataloader_to_predict_ndarray=udf_ml_dataloader_to_predict_ndarray ) raise_log(ValueError(f"Unable to make ml model for {model_class}."))