Source code for paddlets.models.base

# !/usr/bin/env python3
# -*- coding:utf-8 -*-

import abc
from typing import Optional, List, Union
import numpy as np
import pandas as pd
import math

from paddlets import TSDataset, TimeSeries
from paddlets.logger import raise_if, raise_log, raise_if_not
# WARN: import paddlets.models.utils here would cause circular reference, below is raised error:
# ImportError: cannot import name 'BaseModel' from 'paddlets.models' (/home/work/paddlets/paddlets/models/__init__.py)


[docs]class Trainable(object, metaclass=abc.ABCMeta): """ Base class for all trainable classes. Any classes need to be fitted (e.g. :class:`~paddlets.models.base.BaseModel`, :class:`~paddlets.pipeline.Pipeline`, etc.) may inherit from this base class and optionally implement :func:`fit` method. """ def __init__(self): pass
[docs] @abc.abstractmethod def fit( self, train_data: TSDataset, valid_data: Optional[TSDataset] = None ): """ Fit a trainable instance. Any non-abstract classes inherited from this class should implement this method. Args: train_data(TSDataset): Training dataset. valid_data(TSDataset, optional): Validation dataset, optional. """ pass
[docs] @abc.abstractmethod def predict(self, data: TSDataset) -> TSDataset: """ Make prediction. Any non-abstract classes inherited from this class should implement this method. Args: data(TSDataset): A TSDataset for time series forecasting. Returns: TSDataset: Predicted result, in type of TSDataset. """ pass
[docs]class BaseModel(Trainable, metaclass=abc.ABCMeta): """ Base class for all machine learning and deep learning models. Args: in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. Attributes: _in_chunk_len(int): The size of the loopback window, i.e., the number of time steps feed to the model. _out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. _skip_chunk_len(int): The length of time series chunk between past target and future target for a single sample. The skip chunk are neither used as feature (i.e. X) nor label (i.e. Y) for a single sample. """ def __init__( self, in_chunk_len: int, out_chunk_len: int, skip_chunk_len: int ): super(BaseModel, self).__init__() self._in_chunk_len = in_chunk_len self._out_chunk_len = out_chunk_len self._skip_chunk_len = skip_chunk_len def _check_multi_tsdataset(self, datasets: List[TSDataset]): """ Check the validity of multi time series combination transform Args: datasets(List[TSDataset]): Training datasets. """ raise_if( len(datasets) == 0, "The Length of datasets cannot be 0!" ) columns_set = set(tuple(sorted(dataset.columns.items())) for dataset in datasets) raise_if_not( len(columns_set) == 1, "The schema of datasets is not same! Cannot be combined for conversion!" )
[docs] @abc.abstractmethod def fit( self, train_data: TSDataset, valid_data: Optional[TSDataset] = None ): """ Fit a BaseModel instance. Any non-abstract classes inherited from this class should implement this method. Args: train_data(TSDataset): Training dataset. valid_data(TSDataset, optional): Validation dataset, optional. """ pass
[docs] @abc.abstractmethod def predict(self, data: TSDataset) -> TSDataset: """ Make prediction. Any non-abstract classes inherited from this class should implement this method. Args: data(TSDataset): A TSDataset for time series forecasting. Returns: TSDataset: Predicted result, in type of TSDataset. """ pass
[docs] @abc.abstractmethod def save(self, path: str) -> None: """ Saves a BaseModel instance to a disk file. Any non-abstract classes inherited from this class should implement this method. Args: path(str): A path string containing a model file name. """ pass
[docs] @staticmethod @abc.abstractmethod def load(path: str) -> "BaseModel": """ Loads a :class:`~/paddlets.models.base.BaseModel` instance from a file. Any non-abstract classes inherited from this class should implement this method. Args: path(str): A path string containing a model file name. Returns: BaseModel: A loaded model. """ pass
[docs] def recursive_predict( self, tsdataset: TSDataset, predict_length: int, ) -> TSDataset: """ Apply `self.predict` method iteratively for multi-step time series forecasting, the predicted results from the current call will be appended to the `TSDataset` object and will appear in the loopback window for next call. Note that each call of `self.predict` will return a result of length `out_chunk_len`, so it will be called ceiling(`predict_length`/`out_chunk_len`) times to meet the required length. Args: tsdataset(TSDataset): Data to be predicted. predict_length(int): Length of predicted results. Returns: TSDataset: Predicted results. """ # Not supported when _skip_chunk !=0 raise_if(self._skip_chunk_len != 0, f"recursive_predict not supported when \ _skip_chunk_len!=0, got {self._skip_chunk_len}.") # raise_if(predict_length < self._out_chunk_len, f"predict_length must be >= \ # self._out_chunk_len, got {predict_length}.") raise_if(predict_length <= 0, f"predict_length must be > \ 0, got {predict_length}.") tsdataset_copy = tsdataset.copy() # Preprocess tsdataset if isinstance(tsdataset.get_target().data.index, pd.RangeIndex): dataset_end_time = max( tsdataset_copy.get_target().end_time + \ math.ceil(predict_length / self._out_chunk_len) * self._out_chunk_len * \ (tsdataset_copy.get_target().time_index.step), tsdataset_copy.get_known_cov().end_time \ if tsdataset_copy.get_known_cov() is not None \ else tsdataset_copy.get_target().start_time, tsdataset_copy.get_observed_cov().end_time \ if tsdataset_copy.get_observed_cov() is not None \ else tsdataset_copy.get_target().start_time ) elif isinstance(tsdataset.get_target().data.index, pd.DatetimeIndex): dataset_end_time = max( tsdataset_copy.get_target().end_time + \ math.ceil(predict_length / self._out_chunk_len) * self._out_chunk_len * \ (tsdataset_copy.get_target().time_index.freq), tsdataset_copy.get_known_cov().end_time \ if tsdataset_copy.get_known_cov() is not None \ else tsdataset_copy.get_target().start_time, tsdataset_copy.get_observed_cov().end_time \ if tsdataset_copy.get_observed_cov() is not None \ else tsdataset_copy.get_target().start_time ) else: raise_log(ValueError(f"time col type not support, \ index type:{type(tsdataset.get_target().data.index)}")) # Reindex data and the default fill value is np.nan fill_value = np.nan if tsdataset_copy.get_known_cov() is not None: if isinstance(tsdataset_copy.get_known_cov().data.index, pd.RangeIndex): tsdataset_copy.get_known_cov().reindex( pd.RangeIndex(start=tsdataset_copy.get_known_cov().start_time, stop=dataset_end_time + 1, step=tsdataset_copy.get_known_cov().time_index.step), fill_value=fill_value ) else: tsdataset_copy.get_known_cov().reindex( pd.date_range(start=tsdataset_copy.get_known_cov().start_time, end=dataset_end_time, freq=tsdataset_copy.get_known_cov().time_index.freq), fill_value=fill_value ) if tsdataset_copy.get_observed_cov() is not None: if isinstance(tsdataset_copy.get_observed_cov().data.index, pd.RangeIndex): tsdataset_copy.get_observed_cov().reindex( pd.RangeIndex(start=tsdataset_copy.get_observed_cov().start_time, stop=dataset_end_time + 1, step=tsdataset_copy.get_observed_cov().time_index.step), fill_value=fill_value ) else: tsdataset_copy.get_observed_cov().reindex( pd.date_range(start=tsdataset_copy.get_observed_cov().start_time, end=dataset_end_time, freq=tsdataset_copy.get_observed_cov().time_index.freq), fill_value=fill_value ) return self._recursive_predict(tsdataset_copy, predict_length)
def _recursive_predict( self, tsdataset: TSDataset, predict_length: int, ) -> np.ndarray: """ Recursive predict core. Args: tsdataset(TSDataset): Data to be predicted. predict_length(int): Length of predicted results. Returns: TSDataset: Predicted results. """ recursive_rounds = math.ceil(predict_length / self._out_chunk_len) results = [] for _ in range(recursive_rounds): # Model predict output = self.predict(tsdataset) # Update data using predicted value tsdataset = TSDataset.concat([tsdataset, output]) results.append(output) # Concat results result = TSDataset.concat(results) # Resize result result.set_target( TimeSeries(result.get_target().data[0: predict_length], result.freq) ) return result