Source code for paddlets.models.representation.task.repr_forecasting

# !/usr/bin/env python3
# -*- coding:utf-8 -*-
from typing import Callable, Tuple

import numpy as np
from sklearn.multioutput import MultiOutputRegressor
from sklearn.linear_model import Ridge
from sklearn.base import is_regressor, clone

from paddlets.datasets.tsdataset import TSDataset
from paddlets.models.representation.dl.repr_base import ReprBaseModel
from paddlets.models.representation import TS2Vec, CoST
from paddlets.ensemble.base import EnsembleBase
from paddlets.ensemble.stacking_ensemble import StackingEnsembleBase
from paddlets.logger.logger import raise_if, Logger, raise_log
from paddlets.models.data_adapter import DataAdapter
from paddlets.models.utils import to_tsdataset
from paddlets.models import BaseModel
from paddlets.utils.utils import repr_results_to_tsdataset

logger = Logger(__name__)


[docs]class ReprForecasting(StackingEnsembleBase, BaseModel): """ The ReprForecasting Class. Args: in_chunk_len(int): The size of previous time point window to use for representation results out_chunk_len(int): The size of the forecasting horizon, i.e., the number of time steps output by the model. repr_model(ReprBasemodel): Representation model to use for forcast. skip_chunk_len(int): Optional, the number of time steps between in_chunk and out_chunk for a single sample. The skip chunk is neither used as a feature (i.e. X) nor a label (i.e. Y) for a single sample. By default, it will NOT skip any time steps. sampling_stride: Sampling intervals between two adjacent samples. repr_model_params(dict):params for reprmodel init. encode_params(dict):params for reprmodel encode, "slide_len" will set to in_chunk_len by force. downstream_learner(Callable): The downstream learner, should be a sklearn-like regressor, set to Ridge(alpha=0.5) by default. verbose(bool): Turn on Verbose mode,set to true by default. """ def __init__(self, in_chunk_len: int, out_chunk_len: int, repr_model: ReprBaseModel, skip_chunk_len: int = 0, sampling_stride: int = 1, repr_model_params: dict = None, encode_params: dict = None, downstream_learner: Callable = None, verbose: bool = False ) -> None: self._sampling_stride = sampling_stride if repr_model_params is None: repr_model_params = {} if encode_params is None: encode_params = {} raise_if(not isinstance(repr_model_params, dict), "model_params should be a params dict") raise_if(not isinstance(encode_params, dict), "encode_params should be a params dict") self._repr_model = repr_model self._repr_model_params = repr_model_params self._encode_params = encode_params BaseModel.__init__(self, in_chunk_len, out_chunk_len, skip_chunk_len) super().__init__([(repr_model, repr_model_params)], downstream_learner, verbose) def _set_params(self, estimators): """ Set estimators params Set params and initial estimators. Args: estimators(List[Tuple[object, dict]] ): A list of tuple (class,params) consisting of several paddlets models. """ if estimators[0][0] is TS2Vec: if "sliding_len" in self._encode_params: raise_if(self._encode_params["sliding_len"] != self._in_chunk_len, f" Sliding_len ({self._encode_params['sliding_len']})should \ equal to in_chunk_len{self._in_chunk_len} when use ts2vec") else: # sliding_len decide how many previous time point to use for repr res in Ts2vec self._encode_params["sliding_len"] = self._in_chunk_len - 1 elif estimators[0][0] is CoST: if "segment_size" in estimators[0][1]: raise_if(estimators[0][1]["segment_size"] != self._in_chunk_len, f"Segment_size ({self._repr_model_params['segment_size']})should \ equal to in_chunk_len{self._in_chunk_len}.(except ts2vec model)") else: estimators[0][1]["segment_size"] = self._in_chunk_len else: raise_log(f"Unsupported model type {type(self._repr_model)}") if "verbose" not in self._encode_params: self._encode_params["verbose"] = False return super()._set_params(estimators) def _check_estimators(self, estimators): """ Check estimators Check and valid estimators Args: estimators(List[Tuple[object, dict]] ): A list of tuple (class,params) consisting of several paddlets models. """ super()._check_estimators(estimators) if all([issubclass(e[0], ReprBaseModel) for e in estimators]): pass else: raise ValueError("Estimators have unsupported or uncompatible models")
[docs] def fit(self, tsdataset: TSDataset) -> None: """ fit Args: train_tsdataset(TSDataset): Train dataset. """ target_len = len(tsdataset.target) if tsdataset._observed_cov and tsdataset.observed_cov.end_time > tsdataset.target.end_time: tsdataset._observed_cov, _ = tsdataset._observed_cov.split(target_len) # TODO: future know_cov support if tsdataset.known_cov and tsdataset.known_cov.end_time > tsdataset.target.end_time: tsdataset._known_cov, _ = tsdataset._known_cov.split(target_len) super().fit(tsdataset)
def _generate_fit_meta_data(self, tsdataset: TSDataset, valid_data: TSDataset = None) -> Tuple[ np.ndarray, np.ndarray]: """ Generate meta data needed for level 2 Args: train_tsdataset(TSDataset): Train dataset. Return: Tuple[np.ndarray, np.ndarray] """ # fit repr logger.info("Repr model fit start") self._estimators[0].fit(tsdataset) logger.info("Repr model fit end") # encode encode = self._estimators[0].encode(tsdataset, **self._encode_params) # repr res to tsdataset encode_tsdataset = repr_results_to_tsdataset(encode, tsdataset) # get samples samples = DataAdapter().to_sample_dataset(encode_tsdataset, in_chunk_len=1, out_chunk_len=self._out_chunk_len, skip_chunk_len=self._skip_chunk_len, sampling_stride=self._sampling_stride).samples X_meta = [] y_meta = [] # assemble meta # currently not support known_cov for sample in samples: target = sample["future_target"] target = np.array(target).flatten() y_meta.append(target) feature = sample["observed_cov_numeric"] feature = np.array(feature).flatten() X_meta.append(feature) X_meta = np.array(X_meta) y_meta = np.array(y_meta) return X_meta, y_meta def _check_final_learner(self, final_learner) -> None: """ Check if a final learner is given and if it is valid, otherwise set default regressor. Args: final_learner(Callable):A sklearn-like regressor Returns: regressor Raises: ValueError Raise error if given regressor is not a valid sklearn-like regressor. """ if final_learner is None: final_learner = Ridge(alpha=0.5) else: if not is_regressor(final_learner): raise ValueError( f"`final learner` should be a sklearn-like regressor, " f"but found: {final_learner}" ) final_learner = clone(final_learner) return MultiOutputRegressor(final_learner)
[docs] @to_tsdataset(scenario="forecasting") def predict(self, tsdataset: TSDataset) -> TSDataset: """ Predict Args: tsdataset(TSDataset): Dataset to predict. """ tsdataset = tsdataset.copy() target_len = len(tsdataset.target) raise_if(target_len < self._in_chunk_len, f"tsdataset.target length ({target_len}) must < model._in_chunk_len ({self._in_chunk_len})") if tsdataset._observed_cov and tsdataset.observed_cov.end_time > tsdataset.target.end_time: tsdataset._observed_cov, _ = tsdataset._observed_cov.split(target_len) # TODO: future know_cov support if tsdataset.known_cov and tsdataset.known_cov.end_time > tsdataset.target.end_time: tsdataset._known_cov, _ = tsdataset._known_cov.split(target_len) if target_len > self._in_chunk_len: _, tsdataset = tsdataset.split(target_len - self._in_chunk_len) # target_num = len(tsdataset.target.columns) # y_pred = y_pred.reshape(self._out_chunk_len, target_num) return super().predict(tsdataset)
def _generate_predict_meta_data(self, tsdataset: TSDataset) -> Tuple[np.ndarray, np.ndarray]: """ Generate predict meta data needed for level 2 Args: tsdataset(TSDataset): Predcit dataset. """ encode = self._estimators[0].encode(tsdataset, **self._encode_params) encode = encode[0] X_meta = encode[-1].reshape(1, -1) return X_meta
[docs] def save(self, path: str, repr_forecaster_file_name: str = "repr-forecaster-partial.pkl") -> None: """ Save the repr-forecaster model to a directory. Args: path(str): Output directory path. ensemble_file_name(str): Name of repr-forecaster model object. This file contains meta information of repr-forecaster model. """ return super().save(path, repr_forecaster_file_name)
[docs] @staticmethod def load(path: str, repr_forecaster_file_name: str = "repr-forecaster-partial.pkl") -> "StackingEnsembleForecaster": """ Load the repr-forecaster model from a directory. Args: path(str): Input directory path. ensemble_file_name(str): Name of repr-forecaster model object. This file contains meta information of repr-forecaster model. Returns: The loaded ensemble model. """ return EnsembleBase.load(path, repr_forecaster_file_name)