paddlets.models.data_adapter
- class SampleDataset(rawdataset: TSDataset, in_chunk_len: int = 1, out_chunk_len: int = 0, skip_chunk_len: int = 0, sampling_stride: int = 1, fill_last_value: Optional[Union[floating, integer]] = None, time_window: Optional[Tuple[int, int]] = None)[源代码]
基类:
Dataset一个paddle Dataset数据集接口的实现。
time_window默认认为每条样本同时包含特征X时间块(即 in_chunk), 跳过的时间块(即 skip_chunk)以及标签Y(即 out_chunk)。
如果调用者显式地传入time_window参数,并且time_window窗口的上界大于 len(TSDataset._target) - 1, 则意味着构建出的样本将仅包含特征X(即 in_chunk),而不会包含跳过的时间块(即 skip_chunk)或者标签Y(即 out_chunk)。
- 参数
rawdataset (TSDataset) – 待转换的TSDataset数据集。
in_chunk_len (int) – 模型输入的时间序列长度。
out_chunk_len (int) – 模型输出的序列长度。
skip_chunk_len (int) – 可选变量, 输入序列与输出序列之间跳过的序列长度,既不作为特征也不作为预测目标使用,默认值为0。
sampling_stride (int) – 在第i条样本和第i+1条样本之间跨越的时间步数。 具体来说,令 t 为target时序数据的时间索引,t[i] 为第i条样本的起始时间,t[i+1]`为第i+1条样本的起始时间, 则`sampling_stride`代表 `t[i+1] - t[i] 的计算结果,即2条相邻的样本之间相差的时间点的数量。
fill_last_value (float, optional) – 用于填充最后一条样本的值。如果无需填充最后一条样本,请将该值设置为None。
time_window (Tuple, optional) – 一个包含2个元素的元组类型的时间窗口,允许adapter模块在其范围内构建样本。 time_window[0] 值代表窗口范围的下界,time_window[1] 值代表窗口范围的上界。 对于每一个包含在该左闭右闭范围内的元素,都代表一条样本的尾部索引。
实际案例
# 1) in_chunk_len examples # Given: tsdataset.target = [0, 1, 2, 3, 4] skip_chunk_len = 0 out_chunk_len = 1 # 1.1) If in_chunk_len = 1, sample[0]: # X -> skip_chunk -> Y # (0) -> () -> (1) # 1.2) If in_chunk_len = 2, sample[0]: # X -> skip_chunk -> Y # (0, 1) -> () -> (2) # 1.3) If in_chunk_len = 3, sample[0]: # X -> skip_chunk -> Y # (0, 1, 2) -> () -> (3)
# 2) out_chunk_len examples # Given: tsdataset.target = [0, 1, 2, 3, 4] in_chunk_len = 1 skip_chunk_len = 0 # 2.1) If out_chunk_len = 1, sample[0]: # X -> skip_chunk -> Y # (0) -> () -> (1) # 2.2) If out_chunk_len = 2, sample[0]: # X -> skip_chunk -> Y # (0) -> () -> (1, 2) # 2.3) If out_chunk_len = 3, sample[0]: # X -> skip_chunk -> Y # (0) -> () -> (1, 2, 3)
# 3) skip_chunk_len examples # Given: tsdataset.target = [0, 1, 2, 3, 4] in_chunk_len = 1 out_chunk_len = 1 # 3.1) If skip_chunk_len = 0, sample[0]: # X -> skip_chunk -> Y # (0) -> () -> (1) # 3.2) If skip_chunk_len = 1, sample[0]: # X -> skip_chunk -> Y # (0) -> (1) -> (2) # 3.3) If skip_chunk_len = 2, sample[0]: # X -> skip_chunk -> Y # (0) -> (1, 2) -> (3) # 3.4) If skip_chunk_len = 3, sample[0]: # X -> skip_chunk -> Y # (0) -> (1, 2, 3) -> (4)
# 4) sampling_stride examples # Given: tsdataset.target = [0, 1, 2, 3, 4] in_chunk_len = 1 skip_chunk_len = 0 out_chunk_len = 1 # 4.1) If sampling_stride = 1, samples: # X -> skip_chunk -> Y # (0) -> () -> (1) # (1) -> () -> (2) # (2) -> () -> (3) # (3) -> () -> (4) # 4.2) If sampling_stride = 2, samples: # X -> skip_chunk -> Y # (0) -> () -> (1) # (2) -> () -> (3) # 4.3) If sampling_stride = 3, samples: # X -> skip_chunk -> Y # (0) -> () -> (1) # (3) -> () -> (4)
# 5) time_window examples: # 5.1) The default time_window calculation formula is as follows: # time_window[0] = 0 + in_chunk_len + skip_chunk_len + (out_chunk_len - 1) # time_window[1] = max_target_idx # # Given: tsdataset.target = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] in_chunk_len = 4 skip_chunk_len = 3 out_chunk_len = 2 sampling_stride = 1 # The following equation holds: max_target_idx = tsdataset.target[-1] = 10 # The default time_window is calculated as follows: time_window[0] = 0 + 2 + 3 + (4 - 1) = 5 + 3 = 8 time_window[1] = max_target_idx = 10 time_window = (8, 10) # 3 samples will be built in total: X -> Y (0, 1, 2, 3) -> (7, 8) (1, 2, 3, 4) -> (8, 9) (2, 3, 4, 5) -> (9, 10) # 5.2) Each element in time_window refers to the TAIL index of each sample, but NOT the HEAD index. # The following two scenarios shows how to pass in the expected time_window parameter to build samples. # Given: tsdataset.target = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] in_chunk_len = 4 skip_chunk_len = 3 out_chunk_len = 2 # Scenario 5.2.1 - Suppose the following training samples are expected to be built: # X -> Y # (0, 1, 2, 3) -> (7, 8) # (1, 2, 3, 4) -> (8, 9) # (2, 3, 4, 5) -> (9, 10) # The 1st sample's tail index is 8 # The 2nd sample's tail index is 9 # The 3rd sample's tail index is 10 # Thus, the time_window parameter should be as follows: time_window = (8, 10) # All other time_window showing up as follows are NOT correct: time_window = (0, 2) time_window = (0, 10) # Scenario 5.2.2 - Suppose the following predict sample is expected to be built: # X -> Y # (7, 8, 9, 10) -> (14, 15) # The first (i.e. the last) sample's tail index is 15; # Thus, the time_window parameter should be as follows: time_window = (15, 15) # 5.3) The calculation formula of the max allowed time_window upper bound is as follows: # time_window[1] <= len(tsdataset.target) - 1 + skip_chunk_len + out_chunk_len # The reason is that the built paddle.io.Dataset is used for a single call of :func: `model.predict`, as # it only allow for a single predict sample, any time_window upper bound larger than a single predict # sample's TAIL index will not be allowed because there is not enough target time series to build past # target time series chunk. # # Given: tsdataset.target = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] in_chunk_len = 4 skip_chunk_len = 3 out_chunk_len = 2 # For a single :func:`model.predict` call: X = in_chunk = (7, 8, 9, 10) # max allowed time_window[1] is calculated as follows: time_window[1] <= len(tsdataset) - 1 + skip_chunk_len + out_chunk_len = 11 - 1 + 3 + 2 = 15 # Note that time_window[1] (i.e. 15) is larger than the max_target_idx (i.e. 10), but this time_window # upper bound is still valid, because predict sample does not need skip_chunk (i.e. [11, 12, 13]) or # out_chunk (i.e. [14, 15]). # Any values larger than 15 (i.e. 16) is invalid, because the existing target time series is NOT long # enough to build X for the prediction sample, see following example: # Given: time_window = (16, 16) # The calculated out_chunk = (15, 16) # The calculated skip_chunk = (12, 13, 14) # Thus, the in_chunk should be [8, 9, 10, 11] # However, the tail index of the calculated in_chunk 11 is beyond the max target time series # (i.e. tsdataset.target[-1] = 10), so current target timeseries cannot provide 11 to build this sample.
- class MLDataLoader(dataset: SampleDataset, batch_size: int, collate_fn: Optional[Callable[[List[Dict[str, ndarray]]], Dict[str, ndarray]]] = None)[源代码]
基类:
object机器学习样本数据加载器,为MLDataset样本数据集提供一个可批量迭代能力。
MLDataLoader支持可迭代类型的数据集,其会使用单进程批量加载样本,如果用户传入自定义的整合函数,则也会应用于每次迭代的小批量样本之上。
- 参数
dataset (SampleDataset) – 待构建的SampleDataset样本数据集。
batch_size (int) – 单次批量迭代的样本数。
collate_fn (Callable, optional) – 可选参数,用户自定义的应用于每个小批量样本的整合函数。
- class DataAdapter[源代码]
基类:
object深度/非深度时序模型的样本数据构建类,用于将TSDataset转换为SampleDataset样本数据集和DataLoader批量样本加载器。
- to_sample_dataset(rawdataset: TSDataset, in_chunk_len: int = 1, out_chunk_len: int = 0, skip_chunk_len: int = 0, sampling_stride: int = 1, fill_last_value: Optional[Union[floating, integer]] = None, time_window: Optional[Tuple[int, int]] = None) SampleDataset[源代码]
将 TSDataset 数据集转换为 SampleDataset 样本数据集。
- 参数
rawdataset (TSDataset) – 待转换的TSDataset数据集。
in_chunk_len (int) – 模型输入的时间序列长度。
out_chunk_len (int) – 模型输出的序列长度。
skip_chunk_len (int) – 可选变量, 输入序列与输出序列之间跳过的序列长度,既不作为特征也不作为预测目标使用,默认值为0。
sampling_stride (int) – 在第i条样本和第i+1条样本之间跨越的时间步数。 具体来说,令 t 为target时序数据的时间索引,t[i] 为第i条样本的起始时间,t[i+1]`为第i+1条样本的起始时间, 则`sampling_stride`代表 `t[i+1] - t[i] 的计算结果,即2条相邻的样本之间相差的时间点的数量。
fill_last_value (float, optional) – 用于填充最后一条样本的值。如果无需填充最后一条样本,请将该值设置为None。
time_window (Tuple, optional) – 一个包含2个元素的元组类型的时间窗口,允许adapter模块在其范围内构建样本。 time_window[0] 值代表窗口范围的下界,time_window[1] 值代表窗口范围的上界。 对于每一个包含在该左闭右闭范围内的元素,都代表一条样本的尾部索引。
实际案例
samples = [ { "past_target": np.ndarray( shape=(in_chunk_len, target_col_num) ), "future_target": np.ndarray( shape=(out_chunk_len, target_col_num) ), "known_cov_numeric": np.ndarray( shape=(in_chunk_len + out_chunk_len, known_numeric_col_num) ), "known_cov_categorical": np.ndarray( shape=(in_chunk_len + out_chunk_len, known_categorical_col_num) ), "observed_cov_numeric": np.ndarray( shape=(in_chunk_len, observed_numeric_col_num) ), "observed_cov_categorical": np.ndarray( shape=(in_chunk_len + out_chunk_len, observed_categorical_col_num) ), "static_cov_numeric": np.ndarray( shape=(in_chunk_len + out_chunk_len, static_numeric_col_num) ), "static_cov_categorical": np.ndarray( shape=(in_chunk_len + out_chunk_len, static_categorical_col_num) ) }, # ... ]
- to_paddle_dataloader(sample_dataset: SampleDataset, batch_size: int, collate_fn: Optional[Callable] = None, shuffle: bool = True, drop_last: bool = False) DataLoader[源代码]
将 SampleDataset 样本数据集转换为 paddle DataLoader 批量样本加载器。
- 参数
sample_dataset (SampleDataset) – 等待被转换的SampleDataset样本数据集。
batch_size (int) – 单次批量迭代的样本数。
collate_fn (Callable, optional) – 用户自定义的批量样本整合函数,可选。
shuffle (bool, optional) – 是否在生成批量样本之前对数据进行洗牌。默认为True。
drop_last (bool, optional) – 如果剩余数据不满足一个batch size,是否丢弃这些剩余数据。默认为False。
- 返回
构建完成的paddle DataLoader批量样本加载器。
- 返回类型
PaddleDataLoader
实际案例
dataloader = [ # 1st batch { "past_target": paddle.Tensor( shape=(batch_size, in_chunk_len, target_col_num) ), "future_target": paddle.Tensor( shape=(batch_size, out_chunk_len, target_col_num) ), "known_cov_numeric": paddle.Tensor( shape=(batch_size, known_cov_chunk_len, known_cov_numeric_col_num) ), "known_cov_categorical": paddle.Tensor( shape=(batch_size, known_cov_chunk_len, known_cov_categorical_col_num) ), "observed_cov_numeric": paddle.Tensor( shape=(batch_size, observed_cov_chunk_len, observed_cov_col_num) ), "observed_cov_categorical": paddle.Tensor( shape=(batch_size, observed_cov_chunk_len, observed_cov_categorical_col_num) ), "static_cov_numeric": paddle.Tensor( shape=(batch_size, 1, static_cov_numeric_col_num) ), "static_cov_categorical": paddle.Tensor( shape=(batch_size, 1, static_cov_categorical_col_num) ) }, # ... ]
- to_ml_dataloader(sample_dataset: SampleDataset, batch_size: int, collate_fn: Optional[Callable] = None) MLDataLoader[源代码]
将 SampleDataset 样本数据集转换为 MLDataLoader 批量样本加载器。
- 参数
sample_dataset (SampleDataset) – 等待被转换的SampleDataset样本数据集。
batch_size (int) – 单次批量迭代的样本数。
collate_fn (Callable, optional) – 用户自定义的批量样本整合函数,可选。
- 返回
构建完成的MLDataLoader批量样本加载器。
- 返回类型
实际案例
dataloader = [ # 1st batch { "past_target": np.ndarray( shape=(batch_size, in_chunk_len, target_col_num) ), "future_target": np.ndarray( shape=(batch_size, out_chunk_len, target_col_num) ), "known_cov_numeric": np.ndarray( shape=(batch_size, known_cov_chunk_len, known_cov_numeric_col_num) ), "known_cov_categorical": np.ndarray( shape=(batch_size, known_cov_chunk_len, known_cov_categorical_col_num) ), "observed_cov_numeric": np.ndarray( shape=(batch_size, observed_cov_chunk_len, observed_cov_col_num) ), "observed_cov_categorical": np.ndarray( shape=(batch_size, observed_cov_chunk_len, observed_cov_categorical_col_num) ), "static_cov_numeric": np.ndarray( shape=(batch_size, 1, static_cov_numeric_col_num) ), "static_cov_categorical": np.ndarray( shape=(batch_size, 1, static_cov_categorical_col_num) ) }, # ... ]