Source code for paddlets.datasets.tsdataset

# !/usr/bin/env python3
# -*- coding:utf-8 -*-

"""
TSDataset is the fundamental data class in PaddleTS, which is designed as the first-class citizen 
to represent the time series data. It is widely used in PaddleTS. In many cases, a function consumes a TSDataset and produces another TSDataset. 
A TSDataset object is comprised of two kinds of time series data: 

	1. Target:  the key time series data in the time series modeling tasks (e.g. those needs to be forecasted in the time series forecasting tasks).
	2. Covariate: the relevant time series data which are usually helpful for the time series modeling tasks.

Currently, it supports the representation of:

	1. Time series of single target w/wo covariates.
	2. Time series of multiple targets w/wo covariates. 

And the covariates can be categorized into one of the following 3 types:

	1. Observed covariates (`observed_cov`): 
		referring to those variables which can only be observed in the historical data, e.g. measured temperatures

	2. Known covariates (`known_cov`):
		referring to those variables which can be determined at present for future time steps, e.g. weather forecasts

	3. Static covariates (`static_cov`):
		referring to those variables which keep constant over time

A TSDataset object includes one or more TimeSeries objects, representing targets, 
known covariates (known_cov), observed covariates (observed_cov), and static covariates (static_cov), respectively.

"""
from copy import deepcopy
import json
import math
import pickle
from typing import Any, Callable, List, Optional, Sequence, Tuple, Union, Dict

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from paddlets.logger import Logger, raise_if_not, raise_if, raise_log

logger = Logger(__name__)


[docs]class TimeSeries(object): """ TimeSeries is the atomic data structure for representing target(s), observed covariates (observed_cov), and known covariates (known_cov). Each could be comprised of a single or multiple time series data. Args: data(DataFrame|Series): A Pandas DataFrame or Series containing the time series data freq(str|int): A string or int representing the Pandas DateTimeIndex's frequency or RangeIndex's step size Returns: None """ def __init__( self, data: Union[pd.DataFrame, pd.Series], freq: Union[int, str], ): if isinstance(data, pd.Series): data = data.to_frame() raise_if_not( isinstance(data, pd.DataFrame), f"The type of param `data` must be pd.DataFrame or pd.Series, but {type(data)} received" ) self._data = data self._freq = freq if isinstance(self.freq, str): try: self._data = self._data.asfreq(self._freq) self._freq = self._data.index.freqstr except ValueError: raise_log( ValueError(f"Invalid freq: {self._freq}") )
[docs] @classmethod def load_from_dataframe( cls, data: Union[pd.DataFrame, pd.Series], time_col: Optional[str] = None, value_cols: Optional[Union[List[str], str]] = None, freq: Optional[Union[str, int]] = None, drop_tail_nan: bool = False, dtype: Optional[Union[type, Dict[str, type]]] = None ) -> "TimeSeries": """ Construct a TimeSeries object from the specified columns of a DataFrame Args: data(DataFrame|Series): A Pandas DataFrame or Series containing the time series data time_col(str|None): The name of time column, a Pandas DatetimeIndex or RangeIndex. If not set, the DataFrame's index will be used. value_cols(list|str|None): The name of column or the list of columns from which to extract the time series data. If set to `None`, all columns except for the time column will be used as value columns. freq(str|int|None): A string or int representing the Pandas DateTimeIndex's frequency or RangeIndex's step size drop_tail_nan(bool): Drop time series tail nan value or not, if True, drop all `Nan` value after the last `non-Nan` element in the current time series. eg: [nan, 3, 2, nan, nan] -> [nan, 3, 2], [3, 2, nan, nan] -> [3, 2], [nan, nan, nan] -> [] dtype(np.dtype|type|dict): Use a numpy.dtype or Python type to cast entire TimeSeries object to the same type. Alternatively, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Returns: TimeSeries object """ #get data series_data = None if value_cols is None: if isinstance(data, pd.Series): series_data = data.copy() else: series_data = data.loc[:, data.columns != time_col].copy() else: series_data = data.loc[:, value_cols].copy() if isinstance(series_data, pd.DataFrame): raise_if_not( series_data.columns.is_unique, "duplicated column names in the `data`!" ) #get time_col_vals if time_col: raise_if_not( time_col in data.columns, f"The time column: {time_col} doesn't exist in the `data`!" ) time_col_vals = data.loc[:, time_col] else: time_col_vals = data.index #Duplicated values or NaN are not allowed in the time column raise_if( time_col_vals.duplicated().any(), "duplicated values in the time column!" ) #Try to convert to string and generate DatetimeIndex if np.issubdtype(time_col_vals.dtype, np.integer) and isinstance(freq, str): time_col_vals = time_col_vals.astype(str) #get time_index if np.issubdtype(time_col_vals.dtype, np.integer): if freq: #The type of freq should be int when the type of time_col is RangeIndex, which is set to 1 by default raise_if_not( isinstance(freq, int) and freq >= 1, "The type of freq should be int when the type of time_col is RangeIndex") else: freq = 1 start_idx, stop_idx = min(time_col_vals), max(time_col_vals) + freq # All integers in the range must be present raise_if( (stop_idx - start_idx)/freq != len(data), "The number of rows doesn't match with the RangeIndex!" ) time_index = pd.RangeIndex( start=start_idx, stop=stop_idx, step=freq ) elif np.issubdtype(time_col_vals.dtype, np.object_) or \ np.issubdtype(time_col_vals.dtype, np.datetime64): time_col_vals = pd.to_datetime(time_col_vals, infer_datetime_format=True) time_index = pd.DatetimeIndex(time_col_vals) if freq: #freq type needs to be string when time_col type is DatetimeIndex raise_if_not( isinstance(freq, str), "The type of `freq` should be `str` when the type of `time_col` is `DatetimeIndex`." ) else: #If freq is not provided and automatic inference fail, throw exception freq = pd.infer_freq(time_index) raise_if( freq is None, "Failed to infer the `freq`. A valid `freq` is required." ) if freq[0] == '-': freq = freq[1:] else: raise_log(ValueError("The type of `time_col` is invalid.")) if isinstance(series_data, pd.Series): series_data = series_data.to_frame() series_data.set_index(time_index, inplace=True) series_data.sort_index(inplace=True) ts = TimeSeries(series_data, freq) if drop_tail_nan: ts.drop_tail_nan() if dtype: ts.astype(dtype) return ts
@property def time_index(self): """the time index""" return self.data.index @property def columns(self): """the data columns""" return self.data.columns @property def start_time(self) -> Union[pd.Timestamp, int]: """the first value of the time index""" return self.time_index[0] @property def end_time(self) -> Union[pd.Timestamp, int]: """the last value of the time index""" return self.time_index[-1] @property def data(self): """DataFrame storing the data""" return self._data @property def freq(self): """Frequency of TimeSeries""" return self._freq @property def dtypes(self) -> pd.Series: """dtypes of TimeSeries""" return self._data.dtypes def __len__(self): """Length of TimeSeries""" return len(self._data) def __str__(self): """str""" return self._data.__str__() def __repr__(self): """repr""" return self._data.__repr__()
[docs] def astype(self, dtype: Union[np.dtype, type, Dict[str, Union[np.dtype, type]]]): """ Cast a TimeSeries object to the specified dtype Args: dtype(np.dtype|type|dict): Use a numpy.dtype or Python type to cast entire TimeSeries object to the same type. Alternatively, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Raise: TypeError KeyError """ self._data = self._data.astype(dtype) return self
[docs] def to_dataframe(self, copy: bool=True) -> pd.DataFrame: """ Return a pd.DataFrame representation of the TimeSeries object Args: copy(bool): Return a copy or reference Returns: pd.DataFrame """ if copy: return self.data.copy() else: return self.data
[docs] def to_numpy(self, copy: bool=True) -> np.ndarray: """ Return a numpy.ndarray representation of the TimeSeries object Args: copy(bool): Return a copy or reference. Note that copy=False does not ensure that to_numpy() is no-copy. Rather, copy=True ensure that a copy is made, even if not strictly necessary. refer:https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_numpy.html Returns: np.ndarray """ return self.data.to_numpy(copy=copy)
[docs] def get_index_at_point( self, point: Union[pd.Timestamp, str, float, int], after=True ) -> int: """ Convert a point along the time axis into an integer index. Args: point(pd.Timestamp|float|int): Time point, supports 3 types `pd.Timestamp|str`: It only takes effect when the time_index type is pd.DatatimeIndex, the corresponding index is returned, and str will be forcibly converted to pd.DatatimeIndex `float`: the parameter will be treated as the proportion of the time series that should lie before the point. `int`: the parameter will returned as such, provided that it is in the series. Otherwise it will raise a ValueError. after(bool): If the provided pandas Timestamp is not in the time series index, whether to return the index of the next timestamp or the index of the previous one. Returns: int: index Raise: ValueError TypeError """ point_index = -1 if isinstance(point, str): point = pd.Timestamp(point) if isinstance(point, float): raise_if_not( 0.0 <= point <= 1.0, "`point` (float) should be between 0.0 and 1.0." ) point_index = math.floor((self.data.shape[0] - 1) * point) elif isinstance(point, (int, np.int64)): raise_if( point not in range(self.data.shape[0]), "`point` (int) should be a valid index in series." ) point_index = point elif isinstance(point, pd.Timestamp): raise_if_not( isinstance(self.time_index, pd.DatetimeIndex), "The provided `point` is of the Timestamp type, but the type of time column is not DatetimeIndex" ) raise_if_not( point >= self.start_time and point <= self.end_time, "The `point` is out of the valid range." ) if point in self.time_index: point_index = self.time_index.get_loc(point) else: point_index = self.time_index.get_loc( next(filter(lambda t: t >= point, self.time_index)) if after else next(filter(lambda t: t <= point, self.time_index[::-1])) ) else: raise_log( TypeError( "`point` needs to be either `float`, `int` or `pd.Timestamp`" ) ) return point_index
[docs] def split( self, split_point: Union[pd.Timestamp, str, float, int], after=True ) -> Tuple["TimeSeries", "TimeSeries"]: """ Split the TimeSeries object into two TimeSeries objects according to `split_point` Args: split_point(pd.Timestamp|float|int): Where to split the TSDataset, which could be `pd.Timestamp|str`: Only valid when the type of time_index is pd.DatatimeIndex, and str will be forcibly converted to pd.DatatimeIndex `float`: The proportion of the length of the first TSDataset object `int`: Only valid when the type of time_index is pd.RangeIndex If the data of the split_point exists, it will be included in the first TimeSeries object. after(bool): If `split_point` (pd.TimeSeries) doesn't exist in the time index, use the next valid index (True) or the previous one (False) Returns: Tuple["TimeSeries", "TimeSeries"] Raise: ValueError TypeError """ point = self.get_index_at_point(split_point, after) shift = 0 if isinstance(split_point, (int, np.int64)) else 1 return ( TimeSeries(self.data.iloc[: point + shift, :], self.freq), TimeSeries(self.data.iloc[point + shift :, ], self.freq) )
[docs] def copy(self) -> "TimeSeries": """ Make a copy of the TimeSeries object Returns: TimeSeries """ return TimeSeries(self.data.copy(), self.freq)
def __getitem__( self, key: Union[ pd.DatetimeIndex, pd.RangeIndex, slice, ], ) -> "TimeSeries": """ Indexing operation on the TimeSeries object Args: key(pd.DatatimeIndex|pd.RangeIndex|slice): `pd.DatatimeIndex`: Only valid when the type of time_index is pd.DatatimeIndex, return a sub TimeSeries according to pd.DatetimeIndex `pd.RangeIndex`: Only valid when the type of time_index is pd.RangeIndex, return a sub TimeSeries according to pd.RangeIndex `slice`: return a sub TimeSeries by the `slice`, e.g. timeseries[10:20] returns a sub TimeSeries of length 10 Returns: TimeSeries Raise: ValueError """ if isinstance(key, pd.DatetimeIndex): raise_if_not(isinstance(self._data.index, pd.DatetimeIndex), f"The TimeSeries' index is of the type {type(self._data.index)}, but the key is of the type pd.DatetimeIndex") return self.__class__(self._data.loc[key], freq=key.freqstr) elif isinstance(key, pd.RangeIndex): raise_if_not(isinstance(self._data.index, pd.RangeIndex), f"The TimeSeries' index is of the type {type(self._data.index)}, but the key is of the type pd.RangeIndex") return self.__class__(self._data.loc[key], freq=key.step) elif isinstance(key, slice): return self.__class__(self._data[key], freq=self.freq) raise_log(ValueError(f"Invalid type of `key`: {type(key)}, currently only `pd.DatetimeIndex`, `pd.RangeIndex`, and `slice` are supported"))
[docs] @classmethod def concat(cls, tss: List["TimeSeries"], axis: int = 0, drop_duplicates: bool = True, keep: str = 'first') -> "TimeSeries": """ Concatenate a list of TimeSeries objects along the specified axis Args: tss(list[TimeSeries]): A list of TimeSeries objects All TimeSeries' freqs are required to be consistent. When axis=1, time_col is required to be non-repetitive; when axis=0, all columns are required to be non-repetitive axis(int): The axis along which to concatenate the TimeSeries objects drop_duplicates(bool): Drop duplicate indices. keep(str): keep 'first' or 'last' when drop duplicates. Returns: TimeSeries Raise: ValueError """ raise_if_not( len(set(i.freq for i in tss)) == 1, f"Failed to concatenate, the freqs of TimeSeries objects are not consistent ." ) raise_if_not( keep in ["first", "last"], "keep should set to 'first' or 'last'") if axis == 0: data = pd.concat([ts.data for ts in tss], axis=axis) if drop_duplicates: data = data[~data.index.duplicated(keep=keep)] if isinstance(tss[0].data.index, pd.RangeIndex): #Range index concat完会变成Int64index, 需要转换回Range index data = data.set_index(pd.RangeIndex(data.index.values[0], data.index.values[-1] + tss[0].freq, tss[0].freq)) else: raise_if( data.index.duplicated().any(), "Failed to concatenate, duplicated values found in the time column.\ You can set drop_duplicates = True to drop Duplicate values.\ And you can set keep = 'first' or 'last' to choose which value to preserve." ) return TimeSeries(data, tss[0].freq) elif axis == 1: data = pd.concat([ts.data for ts in tss], axis=axis) if drop_duplicates: data = data.loc[:,~data.columns.duplicated(keep=keep)] else: raise_if( data.columns.duplicated().any(), "Failed to concatenate, duplicated column names found.\ You can set drop_duplicates = True to drop Duplicate columns.\ And you can set keep = 'first' or 'last' to choose which value to preserve." ) return TimeSeries(data, tss[0].freq) else: raise_log( ValueError(f"Failed to concatenate, invalid axis: {axis}") )
[docs] def reindex(self, index, fill_value=np.nan, *args, **kwargs) -> "TimeSeries": """ Reindex the TimeSeries object with optional filling logic Args: index: array-like, new index to conform. Preferably an Index object to avoid duplicating data. fill_value: Value to use for missing values. NaN by default, but can be any “compatible” value. args: Optional arguments passed to `DataFrame.reindex` kwargs: Optional arguments passed to `DataFrame.reindex` Returns: TimeSeries Raise: ValueError """ self._data = self._data.reindex(index, fill_value=fill_value, *args, **kwargs)
[docs] def sort_columns(self, ascending: bool = True): """ Sort the TimeSeries object by the index Args: ascending(bool): Sort ascending or descending. When the index is a MultiIndex the sort direction can be controlled for each level individually. """ self._data = self._data.sort_index(axis=1, ascending=ascending) return self
def _find_end_index(self) -> int: """ Identify end indices in series which is not Nan. Return: The end index which is not Nan """ isna_fun = lambda x: pd.isnull(x).all() for i in range(len(self.data) - 1, -1, -1): if not isna_fun(self.data.iloc[i]): break if i == 0: return -1 return i
[docs] def drop_tail_nan(self): """ Drop trailing consecutive Nan values """ end_index = self._find_end_index() self.reindex(self.time_index[:end_index + 1])
[docs] def to_json(self) -> str: """ Return a str json representation of the TimeSeries object. Returns: str """ json_res = {'freq': self._freq} if isinstance(self._freq, str): data = self._data.copy() data.index = data.index.astype(str) else: data = self._data json_res['data'] = data.to_dict() return json.dumps(json_res, ensure_ascii=False)
[docs] @classmethod def load_from_json(cls, json_data: str, **json_load_kwargs) -> "TimeSeries": """ Construct a TimeSeries object from a str json_data Args: json_data(str): json object from which to load data **json_load_kwargs: Optional arguments passed to `json.loads` function Returns: TimeSeries """ res = json.loads(json_data, **json_load_kwargs) freq = res['freq'] data = res['data'] data = pd.DataFrame(data) if isinstance(freq, str): data.index = pd.DatetimeIndex(data.index.to_list()) else: start_idx, stop_idx = min(data.index.astype(int)), max(data.index.astype(int)) + freq data.index = pd.RangeIndex( start=start_idx, stop=stop_idx, step=freq ) return TimeSeries(data, freq)
[docs] def to_categorical(self, col: Optional[Union[str, List[str]]] = None): """ Modify col's type to int as categorical. Args: col(Optional[Union[str, List[str]]]): col names in ts """ if col: if isinstance(col, str): col = [col] dtype = {col_one: np.int64 for col_one in col} self.astype(dtype) else: self.astype(np.int64)
[docs] def to_numeric(self, col: Optional[Union[str, List[str]]] = None): """ Modify col's type to float as numeric. Args: col(Optional[Union[str, List[str]]]): col names in ts """ if col: if isinstance(col, str): col = [col] dtype = {col_one: np.float32 for col_one in col} self.astype(dtype) else: self.astype(np.float32)
[docs]class TSDataset(object): """ TSDataset is the fundamental data class in PaddleTS, which is designed as the first-class citizen to represent the time series data. It is widely used in PaddleTS. In many cases, a function consumes a TSDataset and produces another TSDataset. A TSDataset object is comprised of two kinds of time series data: 1. Target: the key time series data in the time series modeling tasks (e.g. those needs to be forecasted in the time series forecasting tasks). 2. Covariate: the relevant time series data which are usually helpful for the time series modeling tasks. Currently, it supports the representation of: 1. Time series of single target w/wo covariates. 2. Time series of multiple targets w/wo covariates. And the covariates can be categorized into one of the following 3 types: 1. Observed covariates (`observed_cov`): referring to those variables which can only be observed in the historical data, e.g. measured temperatures 2. Known covariates (`known_cov`): referring to those variables which can be determined at present for future time steps, e.g. weather forecasts 3. Static covariates (`static_cov`): referring to those variables which keep constant over time A TSDataset object includes one or more TimeSeries objects, representing targets, known covariates (known_cov), observed covariates (observed_cov), and static covariates (static_cov), respectively. Args: target(TimeSeries|None): Target observed_cov(TimeSeries|None): Observed covariates known_cov(TimeSeries|None): Known covariates static_cov(dict|None): Static covariates fill_missing_dates(bool): Fill missing dates or not fillna_method(str): Method of filling missing values. Totally 7 methods are supported currently: max: Use the max value in the sliding window min: Use the min value in the sliding window avg: Use the mean value in the sliding window median: Use the median value in the sliding window pre: Use the previous value back: Use the next value zero: Use 0s fillna_window_size(int): Size of the sliding window Returns: None """ def __init__( self, target: Optional[TimeSeries] = None, observed_cov: Optional[TimeSeries] = None, known_cov: Optional[TimeSeries] = None, static_cov: Optional[dict] = None, fill_missing_dates: bool = False, fillna_method: str = "pre", fillna_window_size: int = 10, ): self._target = target self._observed_cov = observed_cov self._known_cov = known_cov self._static_cov = static_cov #The type of freq is str when When time_index is DatetimeIndex, int when time_index is RangeIndex self._freq : Optional[str, int] = None self._check_data() #Get built-in analysis operators from paddlets.analysis import TSDataset_Inner_Analyzer self._inner_analyzer = TSDataset_Inner_Analyzer if fill_missing_dates: #Fill the missing values from paddlets.transform import Fill fill_obj = Fill( cols=list(self.columns.keys()), method=fillna_method, window_size=fillna_window_size ) fill_obj.fit_transform(self, inplace=True) def __getattr__(self, name: str) -> Callable: """ Dynamically integrate and call built-in operators (Only analysis operators are currently integrated, and other types of operators may be integrated in the future) Args: name(str): operator name, eg: summary、max、min Returns: Callable: operator funtion Raise: ValueError """ if (name.startswith('__') and name.endswith('__')) or \ (name.startswith('_') and name.endswith('_')) : return super().__getattr__(name) #analysis operators if name in self._inner_analyzer: #the first parameter of the self._inner_analyze operator needs to be TSDataset #functools.partial can only provide the ability of default parameters, which is not flexible enough to use here def partial(*arg, **kwargs): return self._inner_analyzer[name](self, *arg, **kwargs) return partial else: raise_log( ValueError(f"attr: {name} doesn't exist!") ) def _check_data(self): freq_list = [] columns_list = [] if self._target is not None: freq_list.append(self._target.freq) columns_list += list(self._target.columns) if self._observed_cov is not None: freq_list.append(self._observed_cov.freq) columns_list += list(self._observed_cov.columns) if self._known_cov is not None: freq_list.append(self._known_cov.freq) columns_list += list(self._known_cov.columns) if self._static_cov is not None: columns_list += list(self._static_cov.keys()) #check freq raise_if( len(set(freq_list)) != 1, "The freqs of target, observed_covariate, and known_covariate are not consistent." ) self._freq = freq_list[0] #check columns raise_if( len(set(columns_list)) != len(columns_list), "Duplicated column names in target, observed_covariate, and known_covariate." )
[docs] @classmethod def load_from_csv( cls, filepath_or_buffer: str, group_id: str = None, time_col: Optional[str] = None, target_cols: Optional[Union[List[str], str]] = None, label_col: Optional[Union[List[str], str]] = None, observed_cov_cols: Optional[Union[List[str], str]] = None, feature_cols: Optional[Union[List[str], str]] = None, known_cov_cols: Optional[Union[List[str], str]] = None, static_cov_cols: Optional[Union[List[str], str]] = None, freq: Optional[Union[str, int]] = None, fill_missing_dates: bool = False, fillna_method: str = "pre", fillna_window_size: int = 10, drop_tail_nan: bool = False, dtype: Optional[Union[type, Dict[str, type]]] = None, **kwargs ) -> Union["TSDataset", List["TSDataset"]]: """ Construct a TSDataset object from a csv file Args: filepath_or_buffer(str): The path to the CSV file, or the file object; consistent with the argument of `pandas.read_csv` function group_id(str|None): The column name identifying a time series. This means that the group_id identify a sample together with the `time_index`. If you have only one timeseries dataset, Do not pass this parameter or set this to the name of column that is constant. If group_id is provided, the function will return a list of TSDataset which length equal to len(group_id.unique()). eg: A sample of equipment load detection guarantees the data of multiple equipment, in which the `ID` column is used to distinguish different equipment. In this case, the group_id='ID'. time_col(str|None): The name of time column target_cols(list|str|None): The names of columns for target label_col(list|str|None): The names of columns for label in anomaly detection observed_cov_cols(list|str|None): The names of columns for observed covariates feature_cols(list|str|None): The names of columns for feature in anomaly detection known_cov_cols(list|str|None): The names of columns for konwn covariates static_cov_cols(list|str|None): The names of columns for static covariates freq(str|int|None): A str or int representing the DateTimeIndex's frequency or RangeIndex's step size fill_missing_dates(bool): Fill missing dates or not fillna_method(str): Method of filling missing values. Totally 7 methods are supported currently: max: Use the max value in the sliding window min: Use the min value in the sliding window avg: Use the mean value in the sliding window median: Use the median value in the sliding window pre: Use the previous value back: Use the next value zero: Use 0 fillna_window_size(int): Size of the sliding window drop_tail_nan(bool): Drop time series tail nan value or not dtype(np.dtype|type|dict): Use a numpy.dtype or Python type to cast entire TimeSeries object to the same type. Alternatively, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. kwargs: Optional arguments passed to `pandas.read_csv` Returns: Union[TSDataset, List[TSDataset]] """ df = pd.read_csv(filepath_or_buffer=filepath_or_buffer, **kwargs) return cls.load_from_dataframe( df=df, group_id=group_id, time_col=time_col, target_cols=target_cols, label_col=label_col, observed_cov_cols=observed_cov_cols, feature_cols=feature_cols, known_cov_cols=known_cov_cols, static_cov_cols=static_cov_cols, freq=freq, fill_missing_dates=fill_missing_dates, fillna_method=fillna_method, fillna_window_size=fillna_window_size, drop_tail_nan=drop_tail_nan, dtype=dtype, )
[docs] @classmethod def load_from_dataframe( cls, df: pd.DataFrame, group_id: str = None, time_col: Optional[str] = None, target_cols: Optional[Union[List[str], str]] = None, label_col: Optional[Union[List[str], str]] = None, observed_cov_cols: Optional[Union[List[str], str]] = None, feature_cols: Optional[Union[List[str], str]] = None, known_cov_cols: Optional[Union[List[str], str]] = None, static_cov_cols: Optional[Union[List[str], str]] = None, freq: Optional[Union[str, int]] = None, fill_missing_dates: bool = False, fillna_method: str = "pre", fillna_window_size: int = 10, drop_tail_nan: bool = False, dtype: Optional[Union[type, Dict[str, type]]] = None ) -> Union["TSDataset", List["TSDataset"]]: """ Construct a TSDataset object from a DataFrame Args: df(pd.DataFrame): panas.DataFrame object from which to load data group_id(str|None): The column name identifying a time series. This means that the group_id identify a sample together with the `time_index`. If you have only one timeseries dataset, Do not pass this parameter or set this to the name of column that is constant. If group_id is provided, the function will return a list of TSDataset which length equal to len(group_id.unique()). eg: A sample of equipment load detection guarantees the data of multiple equipment, in which the `ID` column is used to distinguish different equipment. In this case, the group_id='ID'. time_col(str|None): The name of time column target_cols(list|str|None): The names of columns for target label_col(list|str|None): The names of columns for label in anomaly detection observed_cov_cols(list|str|None): The names of columns for observed covariates feature_cols(list|str|None): The names of columns for feature in anomaly detection known_cov_cols(list|str|None): The names of columns for konwn covariates static_cov_cols(list|str|None): The names of columns for static covariates freq(str|int|None): A str or int representing the DateTimeIndex's frequency or RangeIndex's step size fill_missing_dates(bool): Fill missing dates or not fillna_method(str): Method of filling missing values. Totally 7 methods are supported currently: max: Use the max value in the sliding window min: Use the min value in the sliding window avg: Use the mean value in the sliding window median: Use the median value in the sliding window pre: Use the previous value back: Use the next value zero: Use 0s fillna_window_size(int): Size of the sliding window drop_tail_nan(bool): Drop time series tail nan value or not dtype(np.dtype|type|dict): Use a numpy.dtype or Python type to cast entire TimeSeries object to the same type. Alternatively, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Returns: Union[TSDataset, List[TSDataset]] """ raise_if_not( df.columns.is_unique, "The column names of the input DataFrame are not unique." ) dfs = [] if group_id is not None: raise_if_not( group_id in df.columns, f"group_id: {group_id} not in df!" ) group_unique = df[group_id].unique() for column in group_unique: dfs.append(df[df[group_id].isin([column])]) else: dfs = [df] res = [] if label_col: raise_if( target_cols is not None, "label_col and target_cols cannot pass at the same time!" ) raise_if( isinstance(label_col, list) and len(label_col) > 1, "The length of label_col must be 1." ) target_cols = label_col if feature_cols: raise_if( observed_cov_cols is not None or known_cov_cols is not None, "feature_cols and cov_cols cannot pass at the same time!" ) observed_cov_cols = feature_cols for df in dfs: target = None observed_cov = None known_cov = None static_cov = dict() if not any([target_cols, observed_cov_cols, known_cov_cols, static_cov_cols]): #By default all columns are target columns target = TimeSeries.load_from_dataframe( df, time_col, [a for a in df.columns if a != time_col], freq, ) if drop_tail_nan: target.drop_tail_nan() else: if target_cols: target = TimeSeries.load_from_dataframe( df, time_col, target_cols, freq, ) if drop_tail_nan: target.drop_tail_nan() if observed_cov_cols: observed_cov = TimeSeries.load_from_dataframe( df, time_col, observed_cov_cols, freq, ) if drop_tail_nan: observed_cov.drop_tail_nan() if known_cov_cols: known_cov = TimeSeries.load_from_dataframe( df, time_col, known_cov_cols, freq, ) if drop_tail_nan: known_cov.drop_tail_nan() if static_cov_cols: if isinstance(static_cov_cols, str): static_cov_cols = [static_cov_cols] for col in static_cov_cols: raise_if( col not in df.columns or len(np.unique(df[col])) != 1, "static cov cals data is not in columns or schema is not right!" ) static_cov[col] = df[col].iloc[0] res.append(cls( target, observed_cov, known_cov, static_cov, fill_missing_dates, fillna_method, fillna_window_size, )) if dtype: for one in res: one.astype(dtype) return res if len(res) > 1 else res[0]
[docs] def to_dataframe(self, copy: bool=True) -> pd.DataFrame: """ Return a pd.DataFrame representation of the TSDataset object Args: copy(bool): Return a copy of or a reference to the underlying DataFrame objects Returns: pd.DataFrame """ pd_list = [] if self._target is not None: pd_list.append(self._target.to_dataframe(copy)) cov = self.get_all_cov() if cov is not None: pd_list.append(cov.to_dataframe(copy)) return pd.concat(pd_list, axis=1)
[docs] def to_numpy(self, copy: bool=True) -> np.ndarray: """ Return a np.ndarray representation of the TSDataset object Args: copy(bool): Return a copy of or a reference to the underlying DataFrame objects, Note that copy=False does not ensure that to_numpy() is no-copy. Rather, copy=True ensures that a copy is made, even if not strictly necessary. refer:https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_numpy.html Returns: np.ndarray """ return self.to_dataframe(copy).to_numpy()
[docs] def get_target(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: target """ return self._target
[docs] def get_label(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: target """ return self.get_target()
[docs] def get_observed_cov(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: observed_cov """ return self._observed_cov
[docs] def get_feature(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: observed_cov """ return self.get_observed_cov()
[docs] def get_known_cov(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: known_cov """ return self._known_cov
[docs] def get_static_cov(self) -> Optional[dict]: """ Returns: dict|None: static_cov """ return self._static_cov
@property def target(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: target """ return self._target @property def label(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: target """ return self.target @property def observed_cov(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: observed_cov """ return self._observed_cov @property def feature(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: observed_cov """ return self.observed_cov @property def known_cov(self) -> Optional["TimeSeries"]: """ Returns: TimeSeries|None: known_cov """ return self._known_cov @property def static_cov(self) -> Optional[dict]: """ Returns: dict|None: static_cov """ return self._static_cov
[docs] def get_all_cov(self) -> Optional[TimeSeries]: """ Returns: pd.DataFrame|None: Merge observed_cov and konw_cov """ if self._known_cov is None: return self._observed_cov elif self._observed_cov is None: return self._known_cov else: return TimeSeries(pd.concat([self._observed_cov.data, self._known_cov.data], axis=1), self._known_cov.freq)
[docs] def set_target(self, target: "TimeSeries"): """ Args: target(TimeSeries): New target Returns: None Raise: ValueError """ self._target = target self._check_data()
[docs] def set_label(self, label: "TimeSeries"): """ Args: label(TimeSeries): New label Returns: None Raise: ValueError """ return self.set_target(label)
[docs] def set_observed_cov(self, observed_cov: "TimeSeries"): """ Args: observed_cov(TimeSeries): New observed_cov Returns: None Raise: ValueError """ self._observed_cov = observed_cov self._check_data()
[docs] def set_feature(self, feature: "TimeSeries"): """ Args: feature(TimeSeries): New feature Returns: None Raise: ValueError """ return self.set_observed_cov(feature)
[docs] def set_known_cov(self, known_cov: "TimeSeries"): """ Args: known_cov(TimeSeries): New known_cov Returns: None Raise: ValueError """ self._known_cov = known_cov self._check_data()
[docs] def set_static_cov(self, static_cov: "dict", append: bool=True): """ Args: static_cov(dict): New static_cov append(bool): Append to the existing static_cov or replace the existing satic_cov Returns: None Raise: ValueError """ if append and self._static_cov: self._static_cov = {**self._static_cov, **static_cov} else: self._static_cov = static_cov self._check_data()
@target.setter def target(self, target: "TimeSeries"): """ Args: target(TimeSeries): New target Returns: None Raise: ValueError """ self._target = target self._check_data() @label.setter def label(self, label: "TimeSeries"): """ Args: label(TimeSeries): New target Returns: None Raise: ValueError """ return self.set_label(label) @observed_cov.setter def observed_cov(self, observed_cov: "TimeSeries"): """ Args: observed_cov(TimeSeries): New observed_cov Returns: None Raise: ValueError """ self._observed_cov = observed_cov self._check_data() @feature.setter def feature(self, feature: "TimeSeries"): """ Args: feature(TimeSeries): New feature Returns: None Raise: ValueError """ return self.set_feature(feature) @known_cov.setter def known_cov(self, known_cov: "TimeSeries"): """ Args: known_cov(TimeSeries): New known_cov Returns: None Raise: ValueError """ self._known_cov = known_cov self._check_data() @static_cov.setter def static_cov(self, static_cov: "dict"): """ Args: static_cov(dict): New static_cov Returns: None Raise: ValueError """ self._static_cov = static_cov self._check_data()
[docs] def split( self, split_point: Union[pd.Timestamp, str, float, int], after=True ) -> Tuple["TSDataset", "TSDataset"]: """ Splits the TSDataset object into two TSDataset objects according to `split_point`, only valid when `self._target` is not None Args: split_point(pd.Timestamp|float|int): Where to split the TSDataset, which could be `pd.Timestamp|str`: Only valid when the type of time_index is pd.DatatimeIndex, and str will be forcibly converted to pd.DatatimeIndex `float`: The proportion of the length of the first TSDataset object `int`: Only valid when the type of time_index is pd.RangeIndex If the data of the split_point exists, it will be included in the first data after(bool): If `split_point` (pd.TimeSeries) doesn't exist in the time column, use the next valid index (True) or the previous one (False) Returns: Tuple["TSDataset", "TSDataset"] Raise: ValueError TypeError """ if self.target is not None: train_target, test_target = self._target.split(split_point, after) time_point_split = train_target.end_time if isinstance(time_point_split, int): time_point_split = (time_point_split - train_target.start_time) // train_target.freq + 1 else: raise_if( self.observed_cov is not None and self.known_cov is not None, "Failed to split, the TSDataset's target can not be None when both observed_cov and known_cov are not None." ) train_target, test_target = (None, None) time_point_split = split_point train_observed_cov, test_observed_cov = self._observed_cov.split(time_point_split, after) \ if self._observed_cov else (None, None) _, test_known_cov = self._known_cov.split(time_point_split, after) \ if self._known_cov else (None, None) return ( TSDataset(train_target, train_observed_cov, self._known_cov, self._static_cov), TSDataset(test_target, test_observed_cov, test_known_cov, self._static_cov) )
[docs] def get_item_from_column(self, column: Union[str, int]) -> Union["TimeSeries", dict]: """ Get the underlying TimeSeries object for targets, observed covariates, and know covariates, or the dict for static_covs according to the column name Args: column(str): column name Returns: Union["TimeSeries", dict] Raise: ValueError """ if self._target and column in self._target.columns: return self.get_target() elif self._observed_cov and column in self._observed_cov.columns: return self.get_observed_cov() elif self._known_cov and column in self._known_cov.columns: return self.get_known_cov() elif self._static_cov and column in self._static_cov: return self.get_static_cov() else: raise ValueError(f"column: {column} not exists!")
def __getitem__( self, columns: Union[str, int, List[Union[str, int]]] ) -> Union[pd.Series, pd.DataFrame]: """ Get data from the specified columns Args: columns(str|int|List): column names Returns: Union[pd.Series, pd.DataFrame] Raise: ValueError """ if isinstance(columns, str) or isinstance(columns, int): columns = [columns] raise_if_not( len(set(columns)) == len(columns), "Duplicated values found in the columns" ) res = None if self._target: columns_in_target = [v for v in columns if v in self._target.columns] if columns_in_target: if len(columns_in_target) == 1: columns_in_target = columns_in_target[0] res = pd.concat([res, self._target.data[columns_in_target]], axis=1) \ if res is not None else self._target.data[columns_in_target] if self._observed_cov: columns_in_observed_cov = [v for v in columns if v in self._observed_cov.columns] if columns_in_observed_cov: if len(columns_in_observed_cov) == 1: columns_in_observed_cov = columns_in_observed_cov[0] res = pd.concat([res, self._observed_cov.data[columns_in_observed_cov]], axis=1) \ if res is not None else self._observed_cov.data[columns_in_observed_cov] if self._known_cov: columns_in_known_cov = [v for v in columns if v in self._known_cov.columns] if columns_in_known_cov: if len(columns_in_known_cov) == 1: columns_in_known_cov = columns_in_known_cov[0] res = pd.concat([res, self._known_cov.data[columns_in_known_cov]], axis=1) \ if res is not None else self._known_cov.data[columns_in_known_cov] if self._static_cov: columns_in_static_cov = [v for v in columns if v in self._static_cov] if columns_in_static_cov: len_static = 1 if res is None else res.shape[0] index_static = [1] if res is None else res.index for tmp in columns_in_static_cov: tmp_df = pd.Series( [self._static_cov[tmp] for i in range(len_static)], index=index_static, name=tmp ) res = pd.concat([res, tmp_df], axis=1) \ if res is not None else tmp_df count = 0 if res is not None: count = res.shape[1] if isinstance(res, pd.DataFrame) else 1 raise_if_not( count == len(columns), "The specified columns don't exist!" ) if isinstance(res, pd.DataFrame): return res[columns] else: return res
[docs] def set_column( self, column: Union[str, int], value: Union[pd.Series, str, int], type: str = 'known_cov' ): """ Add a new column or update the existing column Args: column(str|int): column name value(pd.Series|str|int): New column values. When value=pd.Series, its index must be same as the index of the TSDataset object. When type='static_cov', value can only be int or str. type(str): Only effective when adding a new column, where to put the new column. By default, the new column will be added to known_cov. Returns: None Raise: ValueError """ try: #Get the underlying TimeSeries object when the column exists attr = self.get_item_from_column(column) except ValueError: #If the column doesn't exist, then add a new column if type == 'target': raise_if_not( isinstance(value, pd.Series), "New column added to the target should be pd.Series." ) if self._target is not None: self._target.data[column] = value.reindex(self._target.time_index) else: self._target = TimeSeries.load_from_dataframe(pd.DataFrame( value.rename(column), index=value.index )) elif type == 'known_cov': raise_if_not( isinstance(value, pd.Series), "New column added to the target should be pd.Series." ) if self._known_cov is not None: self._known_cov.data[column] = value.reindex(self._known_cov.time_index) else: self._known_cov = TimeSeries.load_from_dataframe(pd.DataFrame( value.rename(column), index=value.index )) elif type == 'observed_cov': raise_if_not( isinstance(value, pd.Series), "New column added to the observed_cov should be pd.Series." ) if self._observed_cov is not None: self._observed_cov.data[column] = value.reindex(self._observed_cov.time_index) else: self._observed_cov = TimeSeries.load_from_dataframe(pd.DataFrame( value.rename(column), index=value.index )) elif type == 'static_cov': raise_if_not( isinstance(value, int) or isinstance(value, str), "New column added to the static_cov should be int or str" ) if self._static_cov is not None: self._static_cov[column] = value else: self._static_cov = {column: value} else: raise_log( ValueError(f"Illegal type: {type}") ) self._check_data() return #modify if attr == self._static_cov: raise_if_not( isinstance(value, str) or isinstance(value, int), "value is illegal!" ) attr[column] = value else: raise_if_not( isinstance(value, pd.Series), "value is illegal!" ) attr.data[column] = value.reindex(attr.time_index)
def __setitem__( self, column: Union[str, int], value: Union[pd.Series, str, int] ): """ Update an existing column or add a new column to known_cov. For update, the column can be from the target, known_cov, observed_cov, or static_cov. For addition, new columns will be added to known_cov, and see set_column for other operations. Args: column(str|int): column name value(pd.Series|str|int): columns object, Its index must be the same as the index of the target property, the value can only be int or str when updating a column in static_cov Returns: None Raise: ValueError """ # tsdataset['a'] = b only works for adding or updating columns in know_cov and turn to set_column for other cases type = "known_cov" self.set_column(column, value, type) def __str__(self): """str""" return self.to_dataframe().__str__() def __repr__(self): """repr""" return self.to_dataframe().__repr__()
[docs] def drop( self, columns: Union[str, int, List[Union[str, int]]] ): """ Drop column or columns Args: columns(str|int|List): Column name or column names Returns: None Raise: ValueError """ if isinstance(columns, str) or isinstance(columns, int): columns = [columns] raise_if_not( len(set(columns)) == len(columns), "Duplicated column names found" ) if self._target is not None: columns_in_target = [v for v in columns if v in self._target.columns] if columns_in_target: self._target.data.drop(columns_in_target, axis=1, inplace=True) if self._target.data.shape[1] == 0: self._target = None if self._observed_cov is not None: columns_in_observed_cov = [v for v in columns if v in self._observed_cov.columns] if columns_in_observed_cov: self._observed_cov.data.drop(columns_in_observed_cov, axis=1, inplace=True) if self._observed_cov.data.shape[1] == 0: self._observed_cov = None if self._known_cov is not None: columns_in_known_cov = [v for v in columns if v in self._known_cov.columns] if columns_in_known_cov: self._known_cov.data.drop(columns_in_known_cov, axis=1, inplace=True) if self._known_cov.data.shape[1] == 0: self._known_cov = None if self._static_cov is not None: columns_in_static_cov = [v for v in columns if v in self._static_cov] if columns_in_static_cov: for tmp in columns_in_static_cov: del self._static_cov[tmp] if len(self._static_cov) == 0: self._static_cov = None
[docs] def plot(self, columns:Union[List[str], str] = None, add_data:Union[List["TSDataset"], "TSDataset"] = None, labels:Union[List[str], str] = None, low_quantile:float = 0.05, high_quantile:float = 0.95, central_quantile:float = 0.5, **kwargs) -> "pyplot": """ plot function, a wrapper for Dataframe.plot() Args: columns(str|List): The names of columns to be plot. When columns is None, the targets will be plot by default. add_data(List|TSDataset): Add data for joint plotprinting, the default is None labels(str|List): Custom labels, length should be equal to nums of added datasets. central_quantile(float):The quantile (between 0 and 1) to plot as a "central" value, For instance, setting `central_quantile=0.5` will plot the median of each component. (only used when dataset is probability forecasting output ) low_quantile(float): The quantile to use for the lower bound of the plotted confidence interval. Similar to `central_quantile`, this is applied to each component separately (i.e., displaying marginal distributions). No confidence interval is shown if `confidence_low_quantile` is None (default 0.05). (only used when dataset is probability forecasting output ) high_quantile(float):The quantile to use for the upper bound of the plotted confidence interval. Similar to `central_quantile`, this is applied to each component separately (i.e., displaying marginal distributions). No confidence interval is shown if `high_quantile` is None (default 0.95). (only used when dataset is probability forecasting output ) **kwargs: Optional arguments passed to `Dataframe.plot` function Returns: matplotlib.pyplot object Raise: ValueError """ quantile_cols = self._get_quantile_cols_origin_names() if not columns: if len(quantile_cols) == 0: columns = self._target.columns.values.tolist() else: columns = quantile_cols if isinstance(columns, str): columns = [columns] if len(columns) > 10: logger.info(f"To many columns to print ({len(columns)}), Plotting only the first 10 columns.") columns = columns[:10] #The type of plot, the default is line chart kind = "line" if "kind" not in kwargs: kwargs["kind"] = kind #Whether background grid is required, default is required grid = True if "grid" not in kwargs: kwargs["grid"] = grid #plot size figsize = (10,3) if "figsize" not in kwargs: kwargs["figsize"] = figsize if len(quantile_cols) == 0: all_cols = self.columns.keys() else: all_cols = quantile_cols if self.known_cov: all_cols = all_cols + self.known_cov.columns.values.tolist() if self.observed_cov: all_cols = all_cols + self.observed_cov.columns.values.tolist() # plot self data raise_if_not(set(columns) <= set(all_cols), f"Columns {set(columns) - set(all_cols)} do not exist in origin datasets!") label = [] for column in columns: # quantile plot if column in quantile_cols: central_quantile_str = "@quantile" + str(float(central_quantile * 100)) df = self.__getitem__(column + central_quantile_str) plot = df.plot(**kwargs, label = column) self._fill_between_quantiles(column, low_quantile, high_quantile, **kwargs) # normal plot else: df = self.__getitem__(column) plot = df.plot(**kwargs, label = column) plot.legend() #plot added data if add_data: if isinstance(add_data, TSDataset): add_data = [add_data] col_len = len(columns) for ts in add_data: ts_quantile_cols = ts._get_quantile_cols_origin_names() if len(ts_quantile_cols) == 0: all_cols = ts.columns.keys() else: all_cols = ts_quantile_cols if ts.known_cov: all_cols = all_cols + ts.known_cov.columns.values.tolist() if ts.observed_cov: all_cols = all_cols + ts.observed_cov.columns.values.tolist() raise_if_not(set(columns) <= set(all_cols), f"Columns {set(columns) - set(all_cols)} do not exist in added datasets!") if ts.freq != self.freq: logger.warning("Add datas have different frequency with origin data!") for column in columns: if column in ts_quantile_cols: central_quantile_str = "@quantile" + str(float(central_quantile * 100)) df = ts.__getitem__(column + central_quantile_str) plot = df.plot(**kwargs) ts._fill_between_quantiles(column, low_quantile, high_quantile, **kwargs) else: df = ts.__getitem__(column) plot = df.plot(**kwargs) # change labels _, origin_labels = plot.get_legend_handles_labels() origin_labels = [origin_labels[i].split("@quantile50")[0] for i in range(len(origin_labels))] if labels: if isinstance(labels, str): labels = [labels] custome_labels = labels labels = origin_labels raise_if(len(custome_labels) != len(add_data), f"Custom labels does not match added datasets num:{len(add_data)}") count = 1 while count <= len(add_data): for i in range(col_len * count, col_len * (count + 1)): labels[i] = custome_labels[count - 1] + "-" + labels[i] count = count + 1 else: labels = origin_labels count = 1 while count <= len(add_data): for i in range(col_len * count, col_len * (count + 1)): labels[i] = "Add" + str(count) + "-" + labels[i] count = count + 1 plot.legend(labels) return plot
def _fill_between_quantiles(self, column: str = None, low_quantile: float = 0.05, high_quantile: float = 0.95, **kwargs): """ Fill color between quantiles Args: columns(str|List): The names of columns to be plot. When columns is None, the targets will be plot by default. low_quantile(float): The quantile to use for the lower bound of the plotted confidence interval. No confidence interval is shown if `confidence_low_quantile` is None (default 0.05). (only used when dataset is probability forecasting output ) high_quantile(float):The quantile to use for the upper bound of the plotted confidence interval. No confidence interval is shown if `high_quantile` is None (default 0.95). (only used when dataset is probability forecasting output ) Return: None """ alpha = 0.25 raise_if_not("@quantile" in self._target.columns[0], "This dataset do not have quantile info!") raise_if(low_quantile < 0 or low_quantile > 1, "Low quantile value should between 0 and 1!") raise_if(high_quantile < 0 or high_quantile > 1, "High quantile value should between 0 and 1!") raise_if(high_quantile < low_quantile, "Low quantile value should smaller than high quantile!") low_quantile_str = "@quantile" + str(float(low_quantile * 100)) high_quantile_str = "@quantile" + str(float(high_quantile * 100)) plt.fill_between(self.target.data.index, self[column + low_quantile_str].values, self[column + high_quantile_str].values, alpha=( alpha if "alpha" not in kwargs else kwargs["alpha"] ), label =str(int(low_quantile * 100)) + "%-" + str(int(high_quantile * 100)) + "% probability interval" ) def _get_quantile_cols_origin_names(self) -> List[str]: """ Get quantile cols origin names Return: List[str] """ origin_columns = [] for name in self.target.columns: tmp = name.split("@quantile") if tmp[0] not in origin_columns and len(tmp) > 1: origin_columns.append(tmp[0]) return origin_columns
[docs] def copy(self) -> "TSDataset": """ Make a copy of the TSDataset object Returns: TSDataset """ target = self._target.copy() if self._target else None observed_cov = self._observed_cov.copy() if self._observed_cov else None known_cov = self._known_cov.copy() if self._known_cov else None static_cov = deepcopy(self._static_cov) if self._static_cov else None return TSDataset(target, observed_cov, known_cov, static_cov)
[docs] def save(self, file: str): """ Save TSDataset object to a file Args: file(str): file path """ with open(file, 'wb') as f: pickle.dump(self, f)
[docs] @classmethod def load(cls, file: str) -> "TSDataset": """ Load TSDataset from the saved file Args: file(str): file path Returns: TSDataset """ with open(file, 'rb') as f: return pickle.load(f)
[docs] def to_json(self) -> str: """ Return a str json representation of the TSDataset object. Returns: str """ attrs = ['target', 'observed_cov', 'known_cov'] res = {} for attr in attrs: attr_ts = getattr(self, attr) if attr_ts is not None: res[attr] = attr_ts.to_json() else: res[attr] = None if self.static_cov is not None: res['static_cov'] = json.dumps(self.static_cov, ensure_ascii=False) else: res['static_cov'] = None return json.dumps(res)
[docs] @classmethod def load_from_json(cls, json_data: str, **json_load_kwargs) -> "TSDataset": """ Construct a TSDataset object from a str json_data Args: json_data(str): json object from which to load data **json_load_kwargs: Optional arguments passed to `json.loads` function Returns: TSDataset """ res = json.loads(json_data, **json_load_kwargs) attrs = ['target', 'observed_cov', 'known_cov'] params = {} for attr in attrs: if res[attr] is not None: params[attr] = TimeSeries.load_from_json(res[attr], **json_load_kwargs) if res['static_cov'] is not None: params['static_cov'] = json.loads(res['static_cov'], **json_load_kwargs) return TSDataset(**params)
@property def columns(self) -> dict: """return all columns(except static columns) Returns: dict: The key is the column name, and the value is the type, including target, known_cov, and observed_cov """ res = {} if self._target is not None: for column in self._target.columns: res[column] = 'target' if self._known_cov is not None: for column in self._known_cov.columns: res[column] = 'known_cov' if self._observed_cov is not None: for column in self._observed_cov.columns: res[column] = 'observed_cov' return res @property def freq(self): """Frequency of TSDataset""" return self._freq
[docs] @classmethod def concat(cls, tss: List["TSDataset"], axis: int = 0, drop_duplicates = True, keep = 'first') -> "TSDataset": """ Concatenate a list of TSDataset objects along the specified axis Args: tss(list[TimeSeries]): A list of TSDataset objects. All TSDatasets' freqs are required to be consistent. When axis=1, time_col is required to be non-repetitive; when axis=0, all columns are required to be non-repetitive axis(int): The axis along which to concatenate the TimeSeries objects drop_duplicates(bool): Drop duplicate indices. keep(str): keep 'first' or 'last' when drop duplicates. Returns: TSDataset Raise: ValueError """ targets = [ts.get_target() for ts in tss if ts.get_target() is not None] target = TimeSeries.concat(targets, axis, drop_duplicates=drop_duplicates, keep=keep) if len(targets) != 0 else None known_covs = [ts.get_known_cov() for ts in tss if ts.get_known_cov() is not None] known_cov = TimeSeries.concat(known_covs, axis, drop_duplicates=drop_duplicates, keep=keep) if len(known_covs) != 0 else None observed_covs = [ts.get_observed_cov() for ts in tss if ts.get_observed_cov() is not None] observed_cov = TimeSeries.concat(observed_covs, axis, drop_duplicates=drop_duplicates, keep=keep) if len(observed_covs) != 0 else None static_cov = {} for ts in tss: if ts.get_static_cov() is not None: for key, value in ts.get_static_cov().items(): if key in static_cov: raise_if_not( static_cov[key] == value, f"static cov key: {key} have diffent value! concat failed!" ) else: static_cov[key] = value return TSDataset(target, observed_cov, known_cov, static_cov)
[docs] def astype(self, dtype: Union[np.dtype, type, Dict[str, Union[np.dtype, type]]]): """ Cast a TSDataset object to the specified dtype Args: dtype(Union[np.dtype, type, Dict[str, Union[np.dtype, type]]]): Use a numpy.dtype or Python type to cast entire TimeSeries object to the same type. Alternatively, use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Raise: TypeError KeyError """ target_type = {} known_cov_type = {} observed_cov_type = {} static_cov_type = {} if isinstance(dtype, dict): for key, value in dtype.items(): raise_if_not( key in self.columns or \ (self._static_cov and key in self._static_cov), f"Invaild key: {key}" ) if self._static_cov and key in self._static_cov: static_cov_type[key] = value elif self.columns[key] == 'target': target_type[key] = value elif self.columns[key] == 'known_cov': known_cov_type[key] = value elif self.columns[key] == 'observed_cov': observed_cov_type[key] = value else: target_type = known_cov_type = observed_cov_type = static_cov_type = dtype if self._target is not None and target_type: self._target.astype(target_type) if self._known_cov is not None and known_cov_type: self._known_cov.astype(known_cov_type) if self._observed_cov is not None and observed_cov_type: self._observed_cov.astype(observed_cov_type) if self._static_cov and static_cov_type: if isinstance(static_cov_type, dict): for key, value in static_cov_type.items(): self._static_cov[key] = np.array([self._static_cov[key]]).astype(value)[0] else: for key, value in self._static_cov.items(): self._static_cov[key] = np.array([value]).astype(static_cov_type)[0] return self
@property def dtypes(self) -> pd.Series: """ Get dtypes of target, known_covs, observed_covs Returns: pd.Series: <column name, dtype> """ type_list = [] if self._target is not None: type_list.append(self._target.dtypes) if self._known_cov is not None: type_list.append(self._known_cov.dtypes) if self._observed_cov is not None: type_list.append(self._observed_cov.dtypes) if self._static_cov: type_list.append(pd.DataFrame(self._static_cov, index=[0]).dtypes) return pd.concat(type_list)
[docs] def sort_columns(self, ascending: bool = True): """ Sort the TSDataset object by the index Args: ascending(bool): Ascending or descending. When the index is a MultiIndex the sort direction can be controlled for each level individually. """ if self._target is not None: self._target.sort_columns(ascending) if self._known_cov is not None: self._known_cov.sort_columns(ascending) if self._observed_cov is not None: self._observed_cov.sort_columns(ascending) return self
[docs] def to_categorical(self, col: Optional[Union[str, List[str]]] = None): """ Modify col's type to int as categorical. Args: col(Optional[Union[str, List[str]]]): col names in ts """ if col: if isinstance(col, str): col = [col] dtype = {col_one: np.int64 for col_one in col} self.astype(dtype) else: self.astype(np.int64)
[docs] def to_numeric(self, col: Optional[Union[str, List[str]]] = None): """ Modify col's type to float as numeric. Args: col(Optional[Union[str, List[str]]]): col names in ts """ if col: if isinstance(col, str): col = [col] dtype = {col_one: np.float32 for col_one in col} self.astype(dtype) else: self.astype(np.float32)