Pipeline

Pipeline 可用于创建包含多个 特征工程 方法与一个学习器的时序建模机器学习工作流。

1. 组装特征工程工作流

时序数据预处理通常包含多个步骤。我们可将多个特征工程方法组装为一个Pipeline。

1.1. 准备数据

>>> import pandas as pd
>>> import numpy as np
>>> from paddlets.datasets.tsdataset import TimeSeries, TSDataset
>>> target = TimeSeries.load_from_dataframe(
>>>     pd.Series(
>>>         np.random.randn(10).astype(np.float32),
>>>         index=pd.date_range("2022-01-01", periods=10, freq="15T"),
>>>         name="target"
>>>     ))
>>> observed_cov = TimeSeries.load_from_dataframe(
>>>     pd.DataFrame(
>>>         np.random.randn(11, 2).astype(np.float32),
>>>         index=pd.date_range("2022-01-01", periods=11, freq="15T"),
>>>         columns=["observed_a", "observed_b"]
>>>     ))
>>> known_cov = TimeSeries.load_from_dataframe(
>>>     pd.DataFrame(
>>>         np.random.randn(15, 2).astype(np.float32),
>>>         index=pd.date_range("2022-01-01", periods=15, freq="15T"),
>>>         columns=["known_c", "known_d"]
>>>     ))
>>> tsdataset = TSDataset(target, observed_cov, known_cov)
>>> train_dataset, test_dataset = tsdataset.split(0.7)
>>> train_dataset
                       target  observed_a  observed_b   known_c   known_d
2022-01-01 00:00:00  0.222311   -0.277376    0.546331 -1.408227  0.662035
2022-01-01 00:15:00  0.317041    0.854092   -1.857899  0.314928 -0.767439
2022-01-01 00:30:00  1.513104    0.379383    0.850350 -0.909959 -1.331936
2022-01-01 00:45:00  0.598694   -0.445081   -1.326147  0.749286  1.723710
2022-01-01 01:00:00 -0.387747   -0.621718   -1.689694  1.437675 -0.621165
2022-01-01 01:15:00  0.476731    0.890895    0.058239 -0.487614  0.668113
2022-01-01 01:30:00  0.219381   -0.684207   -0.001203 -0.199150  1.221772
2022-01-01 01:45:00       NaN         NaN         NaN  1.413992 -0.452255
2022-01-01 02:00:00       NaN         NaN         NaN  0.228248  0.102397
2022-01-01 02:15:00       NaN         NaN         NaN  0.687319  0.240901
2022-01-01 02:30:00       NaN         NaN         NaN  0.075458 -0.922555
2022-01-01 02:45:00       NaN         NaN         NaN -1.718082  0.362322
2022-01-01 03:00:00       NaN         NaN         NaN -0.126352  1.376127
2022-01-01 03:15:00       NaN         NaN         NaN  1.508944 -2.041886
2022-01-01 03:30:00       NaN         NaN         NaN -0.852201  0.476529

>>> test_dataset
                       target  observed_a  observed_b   known_c   known_d
2022-01-01 01:45:00  1.861470    0.342490    2.156751  1.413992 -0.452255
2022-01-01 02:00:00 -0.942302    0.198202    0.527947  0.228248  0.102397
2022-01-01 02:15:00  0.754010   -1.400311   -0.105994  0.687319  0.240901
2022-01-01 02:30:00       NaN    1.359987    0.119287  0.075458 -0.922555
2022-01-01 02:45:00       NaN         NaN         NaN -1.718082  0.362322
2022-01-01 03:00:00       NaN         NaN         NaN -0.126352  1.376127
2022-01-01 03:15:00       NaN         NaN         NaN  1.508944 -2.041886
2022-01-01 03:30:00       NaN         NaN         NaN -0.852201  0.476529

对于上述数据的处理,我们可能希望利用 KSigma 对观察协变量与已知协变量做异常检测,然后利用 TimeFeatureGenerator 生成时间特征。

1.2. 构建

Pipeline 使用包含 (key, value) 对的列表进行初始化。 key 是特征工程方法的类名,value 是此方法的初始化参数。

Pipeline 由以下部分组成:

>>> from paddlets.pipeline.pipeline import Pipeline
>>> from paddlets.transform import KSigma, TimeFeatureGenerator
>>> pipeline = Pipeline([(KSigma, {"cols":["observed_a", "observed_b", "known_c", "known_d"], "k": 1}), (TimeFeatureGenerator, {})])

1.3. 执行特征工程

拟合 Pipeline 并执行特征工程。

>>> pipeline.fit(train_dataset)
>>> tsdataset_preprocessed = pipeline.transform(test_dataset)
>>> tsdataset_preprocessed
                       target  observed_a  observed_b   known_c   known_d  year  month  day  weekday  hour  quarter  dayofyear  weekofyear  is_holiday  is_workday
2022-01-01 01:45:00  1.861470    0.342490   -0.488575  0.047618 -0.452255  2022      1    1        5     1        1          1          52         1.0         0.0
2022-01-01 02:00:00 -0.942302    0.198202    0.527947  0.228248  0.102397  2022      1    1        5     2        1          1          52         1.0         0.0
2022-01-01 02:15:00  0.754010    0.013713   -0.105994  0.687319  0.240901  2022      1    1        5     2        1          1          52         1.0         0.0
2022-01-01 02:30:00       NaN    0.013713    0.119287  0.075458 -0.922555  2022      1    1        5     2        1          1          52         1.0         0.0
2022-01-01 02:45:00       NaN         NaN         NaN  0.047618  0.362322  2022      1    1        5     2        1          1          52         1.0         0.0
2022-01-01 03:00:00       NaN         NaN         NaN -0.126352  0.046445  2022      1    1        5     3        1          1          52         1.0         0.0
2022-01-01 03:15:00       NaN         NaN         NaN  0.047618  0.046445  2022      1    1        5     3        1          1          52         1.0         0.0
2022-01-01 03:30:00       NaN         NaN         NaN -0.852201  0.476529  2022      1    1        5     3        1          1          52         1.0         0.0

2. 组装含有学习器的工作流

Pipeline 的最后一个步骤可以是一个学习器,你仅需调用一次 fit 即可实现 Pipeline 中的各个特征工程方法与学习器对数据的拟合。

2.1. 构建

Pipeline 由以下部分组成:

  • 用于异常值检测的 Ksigma 特征工程方法。

  • 用于生成时间特征的 TimeFeatureGenerator 特征工程方法。

  • 用于对时序数据建立时序模型的 MLPRegressor 学习器。

>>> from paddlets.models.forecasting import MLPRegressor
>>> mlp_params = {
>>>     'in_chunk_len': 3,
>>>     'out_chunk_len': 2,
>>>     'skip_chunk_len': 0,
>>>     'eval_metrics': ["mse", "mae"]
>>> }
>>> pipeline = Pipeline([(KSigma, {"cols":["observed_a", "observed_b", "known_c", "known_d"], "k": 1}), (MLPRegressor, mlp_params)])

2.2. 拟合 Pipeline 并执行预测

对于拟合后的 Pipeline,你可以使用 pipeline.predict 进行时序预测或使用 recursive_predict 进行多步时序预测。

>>> pipeline.fit(train_dataset)
>>> predicted_results = pipeline.predict(train_dataset)
>>> predicted_results
                       target
2022-01-01 01:45:00 -0.034728
2022-01-01 02:00:00  0.156984

2.3. 递归多步预测

递归多步预测的策略是:一步步利用 pipeline.predict 方法实现多步时序预测。当前时刻的预测结果会被添加至 TSDataset 时序数据集 的目标列中,并在模型预测时,被添加至历史目标值的滑动窗口中用于下一时刻预测。

注意: pipeline.recursive_predict 函数在 pipeline.skip_chunk != 0 时,无法使用。

注意:每次调用 pipeline.predict 的输出长度为 out_chunk_len, 所以 pipeline.predict 会被调用 ceiling(predict_length/out_chunk_len) 次,以满足多步时序的输出长度需求。例如,上述例子中的 Pipelineout_chunk_length 是2,而 recursive_predict 允许你将 predict_length 设置为5或者更多:

>>> train_dataset.set_observed_cov(TimeSeries.concat([train_dataset.observed_cov, test_dataset.observed_cov]))
>>> train_dataset
                       target  observed_a  observed_b   known_c   known_d
2022-01-01 00:00:00  0.222311   -0.277376    0.546331 -1.408227  0.662035
2022-01-01 00:15:00  0.317041    0.854092   -1.857899  0.314928 -0.767439
2022-01-01 00:30:00  1.513104    0.379383    0.850350 -0.909959 -1.331936
2022-01-01 00:45:00  0.598694   -0.445081   -1.326147  0.749286  1.723710
2022-01-01 01:00:00 -0.387747   -0.621718   -1.689694  1.437675 -0.621165
2022-01-01 01:15:00  0.476731    0.890895    0.058239 -0.487614  0.668113
2022-01-01 01:30:00  0.219381   -0.684207   -0.001203 -0.199150  1.221772
2022-01-01 01:45:00       NaN    0.342490    2.156751  1.413992 -0.452255
2022-01-01 02:00:00       NaN    0.198202    0.527947  0.228248  0.102397
2022-01-01 02:15:00       NaN   -1.400311   -0.105994  0.687319  0.240901
2022-01-01 02:30:00       NaN    1.359987    0.119287  0.075458 -0.922555
2022-01-01 02:45:00       NaN         NaN         NaN -1.718082  0.362322
2022-01-01 03:00:00       NaN         NaN         NaN -0.126352  1.376127
2022-01-01 03:15:00       NaN         NaN         NaN  1.508944 -2.041886
2022-01-01 03:30:00       NaN         NaN         NaN -0.852201  0.476529

>>> recursive_predicted_results = pipeline.recursive_predict(train_dataset, predict_length=5)
>>> recursive_predicted_results
                       target
2022-01-01 01:45:00 -0.034728
2022-01-01 02:00:00  0.156984
2022-01-01 02:15:00  0.290443
2022-01-01 02:30:00 -0.007422
2022-01-01 02:45:00  0.025956

当known_cov或者observed_cov存在时,known_cov时间长度需要大于等于 递归预测步数*预测长度 ,observed_cov的时间长度需要大于等于 (递归预测步数-1)*预测长度 ,以满足预测时特征构建的需求。

注意: 在递归预测中,预测误差会被累计,因此预测性能会随着预测长度的增加而下降。

更多使用细节,请参考:API: pipeline.recursive_predict

3. 持久化

与其他的 PaddleTS 模型相似,Pipeline 提供了 save() 和 load() 函数以支持持久化的需要。

3.1. 保存

>>> pipeline.save(path="./")

3.2. 加载

>>> pipeline.load(path="./")
>>> predicted_results = pipeline.predict(train_dataset)