Pipeline
Pipeline 可用于创建包含多个 特征工程 方法与一个学习器的时序建模机器学习工作流。
1. 组装特征工程工作流
时序数据预处理通常包含多个步骤。我们可将多个特征工程方法组装为一个Pipeline。
1.1. 准备数据
>>> import pandas as pd
>>> import numpy as np
>>> from paddlets.datasets.tsdataset import TimeSeries, TSDataset
>>> target = TimeSeries.load_from_dataframe(
>>> pd.Series(
>>> np.random.randn(10).astype(np.float32),
>>> index=pd.date_range("2022-01-01", periods=10, freq="15T"),
>>> name="target"
>>> ))
>>> observed_cov = TimeSeries.load_from_dataframe(
>>> pd.DataFrame(
>>> np.random.randn(11, 2).astype(np.float32),
>>> index=pd.date_range("2022-01-01", periods=11, freq="15T"),
>>> columns=["observed_a", "observed_b"]
>>> ))
>>> known_cov = TimeSeries.load_from_dataframe(
>>> pd.DataFrame(
>>> np.random.randn(15, 2).astype(np.float32),
>>> index=pd.date_range("2022-01-01", periods=15, freq="15T"),
>>> columns=["known_c", "known_d"]
>>> ))
>>> tsdataset = TSDataset(target, observed_cov, known_cov)
>>> train_dataset, test_dataset = tsdataset.split(0.7)
>>> train_dataset
target observed_a observed_b known_c known_d
2022-01-01 00:00:00 0.222311 -0.277376 0.546331 -1.408227 0.662035
2022-01-01 00:15:00 0.317041 0.854092 -1.857899 0.314928 -0.767439
2022-01-01 00:30:00 1.513104 0.379383 0.850350 -0.909959 -1.331936
2022-01-01 00:45:00 0.598694 -0.445081 -1.326147 0.749286 1.723710
2022-01-01 01:00:00 -0.387747 -0.621718 -1.689694 1.437675 -0.621165
2022-01-01 01:15:00 0.476731 0.890895 0.058239 -0.487614 0.668113
2022-01-01 01:30:00 0.219381 -0.684207 -0.001203 -0.199150 1.221772
2022-01-01 01:45:00 NaN NaN NaN 1.413992 -0.452255
2022-01-01 02:00:00 NaN NaN NaN 0.228248 0.102397
2022-01-01 02:15:00 NaN NaN NaN 0.687319 0.240901
2022-01-01 02:30:00 NaN NaN NaN 0.075458 -0.922555
2022-01-01 02:45:00 NaN NaN NaN -1.718082 0.362322
2022-01-01 03:00:00 NaN NaN NaN -0.126352 1.376127
2022-01-01 03:15:00 NaN NaN NaN 1.508944 -2.041886
2022-01-01 03:30:00 NaN NaN NaN -0.852201 0.476529
>>> test_dataset
target observed_a observed_b known_c known_d
2022-01-01 01:45:00 1.861470 0.342490 2.156751 1.413992 -0.452255
2022-01-01 02:00:00 -0.942302 0.198202 0.527947 0.228248 0.102397
2022-01-01 02:15:00 0.754010 -1.400311 -0.105994 0.687319 0.240901
2022-01-01 02:30:00 NaN 1.359987 0.119287 0.075458 -0.922555
2022-01-01 02:45:00 NaN NaN NaN -1.718082 0.362322
2022-01-01 03:00:00 NaN NaN NaN -0.126352 1.376127
2022-01-01 03:15:00 NaN NaN NaN 1.508944 -2.041886
2022-01-01 03:30:00 NaN NaN NaN -0.852201 0.476529
对于上述数据的处理,我们可能希望利用 KSigma 对观察协变量与已知协变量做异常检测,然后利用 TimeFeatureGenerator 生成时间特征。
1.2. 构建
Pipeline 使用包含 (key, value) 对的列表进行初始化。 key 是特征工程方法的类名,value 是此方法的初始化参数。
该 Pipeline 由以下部分组成:
用于异常值检测的 Ksigma 特征工程方法。
用于生成时间特征的 TimeFeatureGenerator 特征工程方法。
>>> from paddlets.pipeline.pipeline import Pipeline
>>> from paddlets.transform import KSigma, TimeFeatureGenerator
>>> pipeline = Pipeline([(KSigma, {"cols":["observed_a", "observed_b", "known_c", "known_d"], "k": 1}), (TimeFeatureGenerator, {})])
1.3. 执行特征工程
拟合 Pipeline 并执行特征工程。
>>> pipeline.fit(train_dataset)
>>> tsdataset_preprocessed = pipeline.transform(test_dataset)
>>> tsdataset_preprocessed
target observed_a observed_b known_c known_d year month day weekday hour quarter dayofyear weekofyear is_holiday is_workday
2022-01-01 01:45:00 1.861470 0.342490 -0.488575 0.047618 -0.452255 2022 1 1 5 1 1 1 52 1.0 0.0
2022-01-01 02:00:00 -0.942302 0.198202 0.527947 0.228248 0.102397 2022 1 1 5 2 1 1 52 1.0 0.0
2022-01-01 02:15:00 0.754010 0.013713 -0.105994 0.687319 0.240901 2022 1 1 5 2 1 1 52 1.0 0.0
2022-01-01 02:30:00 NaN 0.013713 0.119287 0.075458 -0.922555 2022 1 1 5 2 1 1 52 1.0 0.0
2022-01-01 02:45:00 NaN NaN NaN 0.047618 0.362322 2022 1 1 5 2 1 1 52 1.0 0.0
2022-01-01 03:00:00 NaN NaN NaN -0.126352 0.046445 2022 1 1 5 3 1 1 52 1.0 0.0
2022-01-01 03:15:00 NaN NaN NaN 0.047618 0.046445 2022 1 1 5 3 1 1 52 1.0 0.0
2022-01-01 03:30:00 NaN NaN NaN -0.852201 0.476529 2022 1 1 5 3 1 1 52 1.0 0.0
2. 组装含有学习器的工作流
Pipeline 的最后一个步骤可以是一个学习器,你仅需调用一次 fit 即可实现 Pipeline 中的各个特征工程方法与学习器对数据的拟合。
2.1. 构建
该 Pipeline 由以下部分组成:
用于异常值检测的 Ksigma 特征工程方法。
用于生成时间特征的 TimeFeatureGenerator 特征工程方法。
用于对时序数据建立时序模型的 MLPRegressor 学习器。
>>> from paddlets.models.forecasting import MLPRegressor
>>> mlp_params = {
>>> 'in_chunk_len': 3,
>>> 'out_chunk_len': 2,
>>> 'skip_chunk_len': 0,
>>> 'eval_metrics': ["mse", "mae"]
>>> }
>>> pipeline = Pipeline([(KSigma, {"cols":["observed_a", "observed_b", "known_c", "known_d"], "k": 1}), (MLPRegressor, mlp_params)])
2.2. 拟合 Pipeline 并执行预测
对于拟合后的 Pipeline,你可以使用 pipeline.predict 进行时序预测或使用 recursive_predict 进行多步时序预测。
>>> pipeline.fit(train_dataset)
>>> predicted_results = pipeline.predict(train_dataset)
>>> predicted_results
target
2022-01-01 01:45:00 -0.034728
2022-01-01 02:00:00 0.156984
2.3. 递归多步预测
递归多步预测的策略是:一步步利用 pipeline.predict 方法实现多步时序预测。当前时刻的预测结果会被添加至 TSDataset 时序数据集 的目标列中,并在模型预测时,被添加至历史目标值的滑动窗口中用于下一时刻预测。
注意: pipeline.recursive_predict 函数在 pipeline.skip_chunk != 0 时,无法使用。
注意:每次调用 pipeline.predict 的输出长度为 out_chunk_len, 所以 pipeline.predict 会被调用 ceiling(predict_length/out_chunk_len) 次,以满足多步时序的输出长度需求。例如,上述例子中的 Pipeline 的 out_chunk_length 是2,而 recursive_predict 允许你将 predict_length 设置为5或者更多:
>>> train_dataset.set_observed_cov(TimeSeries.concat([train_dataset.observed_cov, test_dataset.observed_cov]))
>>> train_dataset
target observed_a observed_b known_c known_d
2022-01-01 00:00:00 0.222311 -0.277376 0.546331 -1.408227 0.662035
2022-01-01 00:15:00 0.317041 0.854092 -1.857899 0.314928 -0.767439
2022-01-01 00:30:00 1.513104 0.379383 0.850350 -0.909959 -1.331936
2022-01-01 00:45:00 0.598694 -0.445081 -1.326147 0.749286 1.723710
2022-01-01 01:00:00 -0.387747 -0.621718 -1.689694 1.437675 -0.621165
2022-01-01 01:15:00 0.476731 0.890895 0.058239 -0.487614 0.668113
2022-01-01 01:30:00 0.219381 -0.684207 -0.001203 -0.199150 1.221772
2022-01-01 01:45:00 NaN 0.342490 2.156751 1.413992 -0.452255
2022-01-01 02:00:00 NaN 0.198202 0.527947 0.228248 0.102397
2022-01-01 02:15:00 NaN -1.400311 -0.105994 0.687319 0.240901
2022-01-01 02:30:00 NaN 1.359987 0.119287 0.075458 -0.922555
2022-01-01 02:45:00 NaN NaN NaN -1.718082 0.362322
2022-01-01 03:00:00 NaN NaN NaN -0.126352 1.376127
2022-01-01 03:15:00 NaN NaN NaN 1.508944 -2.041886
2022-01-01 03:30:00 NaN NaN NaN -0.852201 0.476529
>>> recursive_predicted_results = pipeline.recursive_predict(train_dataset, predict_length=5)
>>> recursive_predicted_results
target
2022-01-01 01:45:00 -0.034728
2022-01-01 02:00:00 0.156984
2022-01-01 02:15:00 0.290443
2022-01-01 02:30:00 -0.007422
2022-01-01 02:45:00 0.025956
当known_cov或者observed_cov存在时,known_cov时间长度需要大于等于 递归预测步数*预测长度 ,observed_cov的时间长度需要大于等于 (递归预测步数-1)*预测长度 ,以满足预测时特征构建的需求。
注意: 在递归预测中,预测误差会被累计,因此预测性能会随着预测长度的增加而下降。
更多使用细节,请参考:API: pipeline.recursive_predict
3. 持久化
与其他的 PaddleTS 模型相似,Pipeline 提供了 save() 和 load() 函数以支持持久化的需要。
3.1. 保存
>>> pipeline.save(path="./")
3.2. 加载
>>> pipeline.load(path="./")
>>> predicted_results = pipeline.predict(train_dataset)