paddlets.models.data_adapter

class SampleDataset(rawdataset: TSDataset, in_chunk_len: int = 1, out_chunk_len: int = 0, skip_chunk_len: int = 0, sampling_stride: int = 1, fill_last_value: Optional[Union[floating, integer]] = None, time_window: Optional[Tuple[int, int]] = None)[源代码]

基类:Dataset

一个paddle Dataset数据集接口的实现。

time_window默认认为每条样本同时包含特征X时间块(即 in_chunk), 跳过的时间块(即 skip_chunk)以及标签Y(即 out_chunk)。

如果调用者显式地传入time_window参数,并且time_window窗口的上界大于 len(TSDataset._target) - 1, 则意味着构建出的样本将仅包含特征X(即 in_chunk),而不会包含跳过的时间块(即 skip_chunk)或者标签Y(即 out_chunk)。

参数
  • rawdataset (TSDataset) – 待转换的TSDataset数据集。

  • in_chunk_len (int) – 模型输入的时间序列长度。

  • out_chunk_len (int) – 模型输出的序列长度。

  • skip_chunk_len (int) – 可选变量, 输入序列与输出序列之间跳过的序列长度,既不作为特征也不作为预测目标使用,默认值为0。

  • sampling_stride (int) – 在第i条样本和第i+1条样本之间跨越的时间步数。 具体来说,令 t 为target时序数据的时间索引,t[i] 为第i条样本的起始时间,t[i+1]`为第i+1条样本的起始时间, 则`sampling_stride`代表 `t[i+1] - t[i] 的计算结果,即2条相邻的样本之间相差的时间点的数量。

  • fill_last_value (float, optional) – 用于填充最后一条样本的值。如果无需填充最后一条样本,请将该值设置为None。

  • time_window (Tuple, optional) – 一个包含2个元素的元组类型的时间窗口,允许adapter模块在其范围内构建样本。 time_window[0] 值代表窗口范围的下界,time_window[1] 值代表窗口范围的上界。 对于每一个包含在该左闭右闭范围内的元素,都代表一条样本的尾部索引。

实际案例

# 1) in_chunk_len examples
# Given:
tsdataset.target = [0, 1, 2, 3, 4]
skip_chunk_len = 0
out_chunk_len = 1

# 1.1) If in_chunk_len = 1, sample[0]:
# X -> skip_chunk -> Y
# (0) -> () -> (1)

# 1.2) If in_chunk_len = 2, sample[0]:
# X -> skip_chunk -> Y
# (0, 1) -> () -> (2)

# 1.3) If in_chunk_len = 3, sample[0]:
# X -> skip_chunk -> Y
# (0, 1, 2) -> () -> (3)
# 2) out_chunk_len examples
# Given:
tsdataset.target = [0, 1, 2, 3, 4]
in_chunk_len = 1
skip_chunk_len = 0

# 2.1) If out_chunk_len = 1, sample[0]:
# X -> skip_chunk -> Y
# (0) -> () -> (1)

# 2.2) If out_chunk_len = 2, sample[0]:
# X -> skip_chunk -> Y
# (0) -> () -> (1, 2)

# 2.3) If out_chunk_len = 3, sample[0]:
# X -> skip_chunk -> Y
# (0) -> () -> (1, 2, 3)
# 3) skip_chunk_len examples
# Given:
tsdataset.target = [0, 1, 2, 3, 4]
in_chunk_len = 1
out_chunk_len = 1

# 3.1) If skip_chunk_len = 0, sample[0]:
# X -> skip_chunk -> Y
# (0) -> () -> (1)

# 3.2) If skip_chunk_len = 1, sample[0]:
# X -> skip_chunk -> Y
# (0) -> (1) -> (2)

# 3.3) If skip_chunk_len = 2, sample[0]:
# X -> skip_chunk -> Y
# (0) -> (1, 2) -> (3)

# 3.4) If skip_chunk_len = 3, sample[0]:
# X -> skip_chunk -> Y
# (0) -> (1, 2, 3) -> (4)
# 4) sampling_stride examples
# Given:
tsdataset.target = [0, 1, 2, 3, 4]
in_chunk_len = 1
skip_chunk_len = 0
out_chunk_len = 1

# 4.1) If sampling_stride = 1, samples:
# X -> skip_chunk -> Y
# (0) -> () -> (1)
# (1) -> () -> (2)
# (2) -> () -> (3)
# (3) -> () -> (4)

# 4.2) If sampling_stride = 2, samples:
# X -> skip_chunk -> Y
# (0) -> () -> (1)
# (2) -> () -> (3)

# 4.3) If sampling_stride = 3, samples:
# X -> skip_chunk -> Y
# (0) -> () -> (1)
# (3) -> () -> (4)
# 5) time_window examples:
# 5.1) The default time_window calculation formula is as follows:
# time_window[0] = 0 + in_chunk_len + skip_chunk_len + (out_chunk_len - 1)
# time_window[1] = max_target_idx
#
# Given:
tsdataset.target = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
in_chunk_len = 4
skip_chunk_len = 3
out_chunk_len = 2
sampling_stride = 1

# The following equation holds:
max_target_idx = tsdataset.target[-1] = 10

# The default time_window is calculated as follows:
time_window[0] = 0 + 2 + 3 + (4 - 1) = 5 + 3 = 8
time_window[1] = max_target_idx = 10
time_window = (8, 10)

# 3 samples will be built in total:
X -> Y
(0, 1, 2, 3) -> (7, 8)
(1, 2, 3, 4) -> (8, 9)
(2, 3, 4, 5) -> (9, 10)


# 5.2) Each element in time_window refers to the TAIL index of each sample, but NOT the HEAD index.
# The following two scenarios shows how to pass in the expected time_window parameter to build samples.
# Given:
tsdataset.target = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
in_chunk_len = 4
skip_chunk_len = 3
out_chunk_len = 2

# Scenario 5.2.1 - Suppose the following training samples are expected to be built:
# X -> Y
# (0, 1, 2, 3) -> (7, 8)
# (1, 2, 3, 4) -> (8, 9)
# (2, 3, 4, 5) -> (9, 10)

# The 1st sample's tail index is 8
# The 2nd sample's tail index is 9
# The 3rd sample's tail index is 10

# Thus, the time_window parameter should be as follows:
time_window = (8, 10)

# All other time_window showing up as follows are NOT correct:
time_window = (0, 2)
time_window = (0, 10)

# Scenario 5.2.2 - Suppose the following predict sample is expected to be built:
# X -> Y
# (7, 8, 9, 10) -> (14, 15)

# The first (i.e. the last) sample's tail index is 15;

# Thus, the time_window parameter should be as follows:
time_window = (15, 15)

# 5.3) The calculation formula of the max allowed time_window upper bound is as follows:
# time_window[1] <= len(tsdataset.target) - 1 + skip_chunk_len + out_chunk_len
# The reason is that the built paddle.io.Dataset is used for a single call of :func: `model.predict`, as
# it only allow for a single predict sample, any time_window upper bound larger than a single predict
# sample's TAIL index will not be allowed because there is not enough target time series to build past
# target time series chunk.
#
# Given:
tsdataset.target = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
in_chunk_len = 4
skip_chunk_len = 3
out_chunk_len = 2

# For a single :func:`model.predict` call:
X = in_chunk = (7, 8, 9, 10)

# max allowed time_window[1] is calculated as follows:
time_window[1] <= len(tsdataset) - 1 + skip_chunk_len + out_chunk_len = 11 - 1 + 3 + 2 = 15

# Note that time_window[1] (i.e. 15) is larger than the max_target_idx (i.e. 10), but this time_window
# upper bound is still valid, because predict sample does not need skip_chunk (i.e.  [11, 12, 13]) or
# out_chunk (i.e. [14, 15]).

# Any values larger than 15 (i.e. 16) is invalid, because the existing target time series is NOT long
# enough to build X for the prediction sample, see following example:
# Given:
time_window = (16, 16)

# The calculated out_chunk = (15, 16)
# The calculated skip_chunk = (12, 13, 14)

# Thus, the in_chunk should be [8, 9, 10, 11]
# However, the tail index of the calculated in_chunk 11 is beyond the max target time series
# (i.e. tsdataset.target[-1] = 10), so current target timeseries cannot provide 11 to build this sample.
class MLDataLoader(dataset: SampleDataset, batch_size: int, collate_fn: Optional[Callable[[List[Dict[str, ndarray]]], Dict[str, ndarray]]] = None)[源代码]

基类:object

机器学习样本数据加载器,为MLDataset样本数据集提供一个可批量迭代能力。

MLDataLoader支持可迭代类型的数据集,其会使用单进程批量加载样本,如果用户传入自定义的整合函数,则也会应用于每次迭代的小批量样本之上。

参数
  • dataset (SampleDataset) – 待构建的SampleDataset样本数据集。

  • batch_size (int) – 单次批量迭代的样本数。

  • collate_fn (Callable, optional) – 可选参数,用户自定义的应用于每个小批量样本的整合函数。

class DataAdapter[源代码]

基类:object

深度/非深度时序模型的样本数据构建类,用于将TSDataset转换为SampleDataset样本数据集和DataLoader批量样本加载器。

to_sample_dataset(rawdataset: TSDataset, in_chunk_len: int = 1, out_chunk_len: int = 0, skip_chunk_len: int = 0, sampling_stride: int = 1, fill_last_value: Optional[Union[floating, integer]] = None, time_window: Optional[Tuple[int, int]] = None) SampleDataset[源代码]

将 TSDataset 数据集转换为 SampleDataset 样本数据集。

参数
  • rawdataset (TSDataset) – 待转换的TSDataset数据集。

  • in_chunk_len (int) – 模型输入的时间序列长度。

  • out_chunk_len (int) – 模型输出的序列长度。

  • skip_chunk_len (int) – 可选变量, 输入序列与输出序列之间跳过的序列长度,既不作为特征也不作为预测目标使用,默认值为0。

  • sampling_stride (int) – 在第i条样本和第i+1条样本之间跨越的时间步数。 具体来说,令 t 为target时序数据的时间索引,t[i] 为第i条样本的起始时间,t[i+1]`为第i+1条样本的起始时间, 则`sampling_stride`代表 `t[i+1] - t[i] 的计算结果,即2条相邻的样本之间相差的时间点的数量。

  • fill_last_value (float, optional) – 用于填充最后一条样本的值。如果无需填充最后一条样本,请将该值设置为None。

  • time_window (Tuple, optional) – 一个包含2个元素的元组类型的时间窗口,允许adapter模块在其范围内构建样本。 time_window[0] 值代表窗口范围的下界,time_window[1] 值代表窗口范围的上界。 对于每一个包含在该左闭右闭范围内的元素,都代表一条样本的尾部索引。

实际案例

samples = [
    {
        "past_target": np.ndarray(
            shape=(in_chunk_len, target_col_num)
        ),
        "future_target": np.ndarray(
            shape=(out_chunk_len, target_col_num)
        ),
        "known_cov_numeric": np.ndarray(
            shape=(in_chunk_len + out_chunk_len, known_numeric_col_num)
        ),
        "known_cov_categorical": np.ndarray(
            shape=(in_chunk_len + out_chunk_len, known_categorical_col_num)
        ),
        "observed_cov_numeric": np.ndarray(
            shape=(in_chunk_len, observed_numeric_col_num)
        ),
        "observed_cov_categorical": np.ndarray(
            shape=(in_chunk_len + out_chunk_len, observed_categorical_col_num)
        ),
        "static_cov_numeric": np.ndarray(
            shape=(in_chunk_len + out_chunk_len, static_numeric_col_num)
        ),
        "static_cov_categorical": np.ndarray(
            shape=(in_chunk_len + out_chunk_len, static_categorical_col_num)
        )
    },
    # ...
]
to_paddle_dataloader(sample_dataset: SampleDataset, batch_size: int, collate_fn: Optional[Callable] = None, shuffle: bool = True, drop_last: bool = False) DataLoader[源代码]

将 SampleDataset 样本数据集转换为 paddle DataLoader 批量样本加载器。

参数
  • sample_dataset (SampleDataset) – 等待被转换的SampleDataset样本数据集。

  • batch_size (int) – 单次批量迭代的样本数。

  • collate_fn (Callable, optional) – 用户自定义的批量样本整合函数,可选。

  • shuffle (bool, optional) – 是否在生成批量样本之前对数据进行洗牌。默认为True。

  • drop_last (bool, optional) – 如果剩余数据不满足一个batch size,是否丢弃这些剩余数据。默认为False。

返回

构建完成的paddle DataLoader批量样本加载器。

返回类型

PaddleDataLoader

实际案例

dataloader = [
    # 1st batch
    {
        "past_target": paddle.Tensor(
            shape=(batch_size, in_chunk_len, target_col_num)
        ),
        "future_target": paddle.Tensor(
            shape=(batch_size, out_chunk_len, target_col_num)
        ),
        "known_cov_numeric": paddle.Tensor(
            shape=(batch_size, known_cov_chunk_len, known_cov_numeric_col_num)
        ),
        "known_cov_categorical": paddle.Tensor(
            shape=(batch_size, known_cov_chunk_len, known_cov_categorical_col_num)
        ),
        "observed_cov_numeric": paddle.Tensor(
            shape=(batch_size, observed_cov_chunk_len, observed_cov_col_num)
        ),
        "observed_cov_categorical": paddle.Tensor(
            shape=(batch_size, observed_cov_chunk_len, observed_cov_categorical_col_num)
        ),
        "static_cov_numeric": paddle.Tensor(
            shape=(batch_size, 1, static_cov_numeric_col_num)
        ),
        "static_cov_categorical": paddle.Tensor(
            shape=(batch_size, 1, static_cov_categorical_col_num)
        )
    },

    # ...
]
to_ml_dataloader(sample_dataset: SampleDataset, batch_size: int, collate_fn: Optional[Callable] = None) MLDataLoader[源代码]

将 SampleDataset 样本数据集转换为 MLDataLoader 批量样本加载器。

参数
  • sample_dataset (SampleDataset) – 等待被转换的SampleDataset样本数据集。

  • batch_size (int) – 单次批量迭代的样本数。

  • collate_fn (Callable, optional) – 用户自定义的批量样本整合函数,可选。

返回

构建完成的MLDataLoader批量样本加载器。

返回类型

MLDataLoader

实际案例

dataloader = [
    # 1st batch
    {
        "past_target": np.ndarray(
            shape=(batch_size, in_chunk_len, target_col_num)
        ),
        "future_target": np.ndarray(
            shape=(batch_size, out_chunk_len, target_col_num)
        ),
        "known_cov_numeric": np.ndarray(
            shape=(batch_size, known_cov_chunk_len, known_cov_numeric_col_num)
        ),
        "known_cov_categorical": np.ndarray(
            shape=(batch_size, known_cov_chunk_len, known_cov_categorical_col_num)
        ),
        "observed_cov_numeric": np.ndarray(
            shape=(batch_size, observed_cov_chunk_len, observed_cov_col_num)
        ),
        "observed_cov_categorical": np.ndarray(
            shape=(batch_size, observed_cov_chunk_len, observed_cov_categorical_col_num)
        ),
        "static_cov_numeric": np.ndarray(
            shape=(batch_size, 1, static_cov_numeric_col_num)
        ),
        "static_cov_categorical": np.ndarray(
            shape=(batch_size, 1, static_cov_categorical_col_num)
        )
    },

    # ...
]