Source code for paddlets.models.classify.dl.paddle_base

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

from typing import List, Dict, Any, Callable, Optional, Tuple
from collections import OrderedDict
from copy import deepcopy
import time
import abc
import os
import pickle
import json

from paddle.optimizer import Optimizer
from paddle.nn import CrossEntropyLoss
from sklearn.utils import check_random_state
import numpy as np
import paddle

from paddlets.models.common.callbacks import (
    CallbackContainer,
    EarlyStopping,
    Callback,
    History,
)
from paddlets.metrics import (
    MetricContainer, 
    Metric
)
from paddlets.models.classify.dl.adapter.data_adapter import ClassifyDataAdapter
from paddlets.models.classify.base import BaseClassifier
from paddlets.datasets import TSDataset
from paddlets.logger import raise_if, raise_if_not, raise_log, Logger

logger = Logger(__name__)


[docs]class PaddleBaseClassifier(BaseClassifier): """Base class for all paddle deep time series classify models. Args: loss_fn(Callable[..., paddle.Tensor]|None): Loss function. optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. optimizer_params(Dict[str, Any]): Optimizer parameters. eval_metrics(List[str]): Evaluation metrics of model. callbacks(List[Callback]): Customized callback functions. batch_size(int): Number of samples per batch. max_epochs(int): Max epochs during training. verbose(int): Verbosity mode. patience(int): Number of epochs to wait for improvement before terminating. seed(int|None): Global random seed. Attributes: _loss_fn(Callable[..., paddle.Tensor]|None): Loss function. _optimizer_fn(Callable[..., Optimizer]): Optimizer algorithm. _optimizer_params(Dict[str, Any]): Optimizer parameters. _eval_metrics(List[str]): Evaluation metrics of model. _callbacks(List[Callback]): Customized callback functions. _batch_size(int): Number of samples per batch. _max_epochs(int): Max epochs during training. _verbose(int): Verbosity mode. _patience(int): Number of epochs to wait for improvement before terminating. _seed(int|None): Global random seed. _classes_(ndarray): ndarray of class labels, possibly strings _n_class(int) : number of unique labels _stop_training(bool) Training status. _fit_params(Dict[str, Any]): Infer parameters by TSdataset automatically. _network(paddle.nn.Layer): Network structure. _optimizer(Optimizer): Optimizer. _metrics(List[Metric]): List of metric instance. _metrics_names(List[str]): List of metric names. _metric_container_dict(Dict[str, MetricContainer]): Dict of metric container. _history(History): Callback that records events into a `History` object. _callback_container(CallbackContainer): Container holding a list of callbacks. """ def __init__( self, loss_fn: Callable[..., paddle.Tensor] = None, optimizer_fn: Callable[..., Optimizer] = paddle.optimizer.Adam, optimizer_params: Dict[str, Any] = dict(learning_rate=1e-3), eval_metrics: List[str] = [], callbacks: List[Callback] = [], batch_size: int = 32, max_epochs: int = 10, verbose: int = 1, patience: int = 4, seed: Optional[int] = None, ): super(PaddleBaseClassifier, self).__init__() self._loss_fn = loss_fn self._optimizer_fn = optimizer_fn self._optimizer_params = deepcopy(optimizer_params) self._eval_metrics = deepcopy(eval_metrics) self._callbacks = deepcopy(callbacks) self._batch_size = batch_size self._max_epochs = max_epochs self._verbose = verbose self._patience = patience self._seed = seed self._stop_training = False self._fit_params = None self._network = None self._optimizer = None self._metrics = None self._metrics_names = None self._metric_container_dict = None self._history = None self._callback_container = None self._classes_ = [] self._n_class = 0 # Parameter check. self._check_params() if seed is not None: paddle.seed(seed) def _check_params(self): """Parameter validity verification. Check logic: batch_size: batch_size must be > 0. max_epochs: max_epochs must be > 0. verbose: verbose must be > 0. patience: patience must be >= 0. """ raise_if(self._batch_size <= 0, f"batch_size must be > 0, got {self._batch_size}.") raise_if(self._max_epochs <= 0, f"max_epochs must be > 0, got {self._max_epochs}.") raise_if(self._verbose <= 0, f"verbose must be > 0, got {self._verbose}.") raise_if(self._patience < 0, f"patience must be >= 0, got {self._patience}.") # If user does not specify an evaluation standard, a metric is provided by default. if not self._eval_metrics: self._eval_metrics = ["mse"] def _check_tsdatasets( self, tsdatasets: List[TSDataset], labels: np.ndarray ): """Ensure the robustness of input data (consistent feature order), at the same time, check whether the data types are compatible. If not, the processing logic is as follows. Processing logic: 1> Floating: Convert to np.float32. 2> Missing value: Warning. 3> Other: Illegal. Args: tsdataset(TSDataset): Data to be checked. labels:(np.ndarray) : The data class labels """ for i in range(len(tsdatasets)): self.check_tsdataset(tsdatasets[i])
[docs] def check_tsdataset(self, tsdataset: TSDataset): """Ensure the robustness of input data (consistent feature order), at the same time, check whether the data types are compatible. If not, the processing logic is as follows. 1> Floating: Convert to np.float32. 2> Missing value: Warning. 3> Other: Illegal. Args: tsdataset(TSDataset): Data to be checked. """ new_dtypes = {} for column, dtype in tsdataset.dtypes.items(): if np.issubdtype(dtype, np.floating): new_dtypes.update({column: "float32"}) else: msg = f"{dtype} data type not supported, the illegal columns contains: " \ + f"{tsdataset.dtypes.index[tsdataset.dtypes == dtype].tolist()}" raise_log(TypeError(msg)) # Check whether the data contains NaN. if np.isnan(tsdataset[column]).any() or np.isinf(tsdataset[column]).any(): msg = f"np.inf or np.NaN, which may lead to unexpected results from the model" msg = f"Input `{column}` contains {msg}." logger.warning(msg) if new_dtypes: tsdataset.astype(new_dtypes)
def _init_optimizer(self) -> Optimizer: """Setup optimizer. Returns: Optimizer. """ return self._optimizer_fn( **self._optimizer_params, parameters=self._network.parameters() ) def _init_fit_dataloaders( self, train_tsdatasets: List[TSDataset], train_labels: np.ndarray, valid_tsdatasets: List[TSDataset] = None, valid_labels: np.ndarray = None, shuffle: bool = True ) -> Tuple[paddle.io.DataLoader, List[paddle.io.DataLoader]]: """Generate dataloaders for train and eval set. Args: train_tsdatasets(TSDataset): Train set. train_labels:(np.ndarray) : The train data class labels valid_tsdatasets(TSDataset|None): Eval set. valid_labels:(np.ndarray) : The valid data class labels shuffle(bool): Shuffle or not. Returns: paddle.io.DataLoader: Training dataloader. List[paddle.io.DataLoader]: List of validation dataloaders.. """ self._check_tsdatasets(train_tsdatasets, train_labels) data_adapter = ClassifyDataAdapter() train_dataset = data_adapter.to_paddle_dataset( train_tsdatasets, train_labels, ) self._n_classes = train_dataset.n_classes_ self._classes_ = train_dataset.classes_ train_dataloader = data_adapter.to_paddle_dataloader(train_dataset, self._batch_size, shuffle=shuffle) if valid_tsdatasets is None: valid_dataloader = None else: self._check_tsdatasets(valid_tsdatasets, valid_labels) valid_dataset = data_adapter.to_paddle_dataset( valid_tsdatasets, valid_labels, ) valid_dataloader = data_adapter.to_paddle_dataloader(valid_dataset, self._batch_size, shuffle=shuffle) return train_dataloader, valid_dataloader def _init_predict_dataloader( self, tsdatasets: List[TSDataset], labels: np.ndarray = None ) -> paddle.io.DataLoader: """Generate dataloaders for data to be predicted. Args: tsdataset(TSDataset): Data to be predicted. labels:(np.ndarray) : The predicted data class labels Returns: paddle.io.DataLoader: dataloader. """ self._check_tsdatasets(tsdatasets, labels) data_adapter = ClassifyDataAdapter() dataset = data_adapter.to_paddle_dataset( tsdatasets, labels, ) dataloader = data_adapter.to_paddle_dataloader(dataset, self._batch_size, shuffle=False) return dataloader def _init_metrics( self, eval_names: List[str] ) -> Tuple[List[Metric], List[str], Dict[str, MetricContainer]]: """Set attributes relative to the metrics. Args: eval_names(List[str]): List of eval set names. Returns: List[Metric]: List of metric instance. List[str]: List of metric names. Dict[str, MetricContainer]: Dict of metric container. """ metrics = self._eval_metrics metric_container_dict = OrderedDict() for name in eval_names: metric_container_dict.update({ name: MetricContainer(metrics, prefix=f"{name}_") }) metrics, metrics_names = [], [] for _, metric_container in metric_container_dict.items(): metrics.extend(metric_container._metrics) metrics_names.extend(metric_container._names) return metrics, metrics_names, metric_container_dict def _init_callbacks(self) -> Tuple[History, CallbackContainer]: """Setup the callbacks functions. Returns: History: Callback that records events into a `History` object. CallbackContainer: Container holding a list of callbacks. """ # Use the last metric in the container as the standard for early stopping. early_stopping_metric = ( self._metrics_names[-1] if len(self._metrics_names) > 0 else None ) # Set callback functions, including history, early stopping, etc.. history, callbacks = History(self._verbose), [] # nqa callbacks.append(history) if (early_stopping_metric is not None) and (self._patience > 0): early_stopping = EarlyStopping( early_stopping_metric=early_stopping_metric, is_maximize=self._metrics[-1]._MAXIMIZE, patience=self._patience ) callbacks.append(early_stopping) else: logger.warning("No early stopping will be performed, last training weights will be used.") if self._callbacks: callbacks.extend(self._callbacks) callback_container = CallbackContainer(callbacks) callback_container.set_trainer(self) return history, callback_container
[docs] def fit( self, train_tsdatasets: List[TSDataset], train_labels: np.ndarray, valid_tsdatasets: List[TSDataset] = None, valid_labels: np.ndarray = None ): """ Train a neural network stored in self._network, using train_dataloader for training data and valid_dataloader for validation. Args: train_tsdataset(TSDataset): Train set. train_labels:(np.ndarray) : The train data class labels valid_tsdataset(TSDataset|None): Eval set, used for early stopping. valid_labels:(np.ndarray) : The valid data class labels """ self._fit_params = self._update_fit_params(train_tsdatasets, train_labels, valid_tsdatasets, valid_labels) train_dataloader, valid_dataloader = self._init_fit_dataloaders(train_tsdatasets, train_labels, valid_tsdatasets, valid_labels) self._fit(train_dataloader, valid_dataloader)
def _fit( self, train_dataloader: paddle.io.DataLoader, valid_dataloader: List[paddle.io.DataLoader] = None ): """Fit function core logic. Args: train_dataloader(paddle.io.DataLoader): Train set. valid_dataloader(paddle.io.DataLoader|None): Eval set. """ valid_names = [] if valid_dataloader is None else ["val_0"] self._metrics, self._metrics_names, \ self._metric_container_dict = self._init_metrics(valid_names) self._history, self._callback_container = self._init_callbacks() self._network = self._init_network() self._optimizer = self._init_optimizer() check_random_state(self._seed) # Call the `on_train_begin` method of each callback before the training starts. self._callback_container.on_train_begin({"start_time": time.time()}) for epoch_idx in range(self._max_epochs): # Call the `on_epoch_begin` method of each callback before the epoch starts. self._callback_container.on_epoch_begin(epoch_idx) self._train_epoch(train_dataloader) if len(valid_names) > 0: self._predict_epoch(valid_names[0], valid_dataloader) # Call the `on_epoch_end` method of each callback at the end of the epoch. self._callback_container.on_epoch_end( epoch_idx, logs=self._history._epoch_metrics ) if self._stop_training: break # Call the `on_train_end` method of each callback at the end of the training. self._callback_container.on_train_end() self._network.eval()
[docs] def predict( self, tsdatasets: List[TSDataset], ) -> np.ndarray: """Predict labels. the result are output as ndarray. Args: tsdataset(List[TSDataset]) : Data to be predicted. Returns: np.ndarray. """ dataloader = self._init_predict_dataloader(tsdatasets) probs = self._predict(dataloader) # np.save('probs',probs) rng = check_random_state(self._seed) return np.array( [ self._classes_[int(rng.choice(np.flatnonzero(prob == prob.max())))] for prob in probs ] )
[docs] def predict_proba( self, tsdatasets: List[TSDataset] ) -> np.ndarray: """Find probability estimates for each class for all cases. Args: tsdataset(List[TSDataset]) : Data to be predicted. labels:(np.ndarray) : The predicted data class labels Returns: np.ndarray. """ dataloader = self._init_predict_dataloader(tsdatasets) return self._predict(dataloader)
def _predict( self, dataloader: paddle.io.DataLoader ) -> np.ndarray: """Predict function core logic. Args: dataloader(paddle.io.DataLoader): Data to be predicted. Returns: np.ndarray. """ self._network.eval() results = [] for batch_nb, data in enumerate(dataloader): X, _ = self._prepare_X_y(data) output = self._network(X) predictions = output.numpy() results.append(predictions) results = np.vstack(results) # check if binary classification if results.shape[1] == 1: # first column is probability of class 0 and second is of class 1 probs = np.hstack([1 - results, results]) results = results / results.sum(axis=1, keepdims=1) return results def _train_epoch( self, train_loader: paddle.io.DataLoader ): """Trains one epoch of the network in self._network. Args: train_loader(paddle.io.DataLoader): Training dataloader. """ self._network.train() for batch_idx, data in enumerate(train_loader): self._callback_container.on_batch_begin(batch_idx) X, y = self._prepare_X_y(data) batch_logs = self._train_batch(X, y) self._callback_container.on_batch_end(batch_idx, batch_logs) epoch_logs = {"lr": self._optimizer.get_lr()} self._history._epoch_metrics.update(epoch_logs) def _train_batch( self, X: Dict[str, paddle.Tensor], y: paddle.Tensor ) -> Dict[str, Any]: """Trains one batch of data. Args: X(Dict[str, paddle.Tensor]): Dict of feature tensor. y(paddle.Tensor): Target tensor. Returns: Dict[str, Any]: Dict of logs. """ output = self._network(X) loss = self._compute_loss(output, y) loss.backward() self._optimizer.step() self._optimizer.clear_grad() batch_logs = { "batch_size": y.shape[0], "loss": loss.item() } return batch_logs def _predict_epoch( self, name: str, loader: paddle.io.DataLoader ): """Predict an epoch and update metrics. Args: name(str): Name of the validation set. loader(paddle.io.DataLoader): DataLoader with validation set. """ self._network.eval() list_y_true, list_y_score = [], [] for batch_idx, data in enumerate(loader): X, y = self._prepare_X_y(data) scores = self._predict_batch(X) list_y_true.append(y) list_y_score.append(scores) y_true, scores = np.vstack(list_y_true), np.vstack(list_y_score) metrics_logs = self._metric_container_dict[name](y_true, scores) self._history._epoch_metrics.update(metrics_logs) self._network.train() def _predict_batch( self, X: paddle.Tensor ) -> np.ndarray: """Predict one batch of data. Args: X(paddle.Tensor): Feature tensor. Returns: np.ndarray: Prediction results. """ scores = self._network(X) return scores.numpy() def _prepare_X_y(self, X: Dict[str, paddle.Tensor] ) -> Tuple[Dict[str, paddle.Tensor], paddle.Tensor]: """Split the packet into X, y. Note: This function could be overrided by the subclass if necessary. Args: X(Dict[str, paddle.Tensor]): Dict of feature tensor. Returns: X(Dict[str, paddle.Tensor]): Dict of feature tensor. y(paddle.Tensor): feature tensor. """ y = X['label'] return X, y def _compute_loss( self, y_score: paddle.Tensor, y_true: paddle.Tensor ) -> paddle.Tensor: """Compute the loss. Note: This function could be overrided by the subclass if necessary. Args: y_score(paddle.Tensor): Estimated target values. y_true(paddle.Tensor): Ground truth (correct) target values. Returns: paddle.Tensor: Loss value. """ if self._loss_fn == paddle.nn.functional.cross_entropy: return paddle.nn.functional.cross_entropy(y_score, y_true, soft_label=True) else: return self._loss_fn(y_score, y_true)
[docs] def score( self, tsdatasets: List[TSDataset], labels: np.ndarray ) -> float: """Scores predicted labels against ground truth labels on X. Args: tsdataset(List[TSDataset]) : Data to be predicted. labels:(np.ndarray) : The predicted data class labels Returns: float, accuracy score of predict(X) vs y """ from sklearn.metrics import accuracy_score preds = self.predict(tsdatasets) return accuracy_score(labels, preds, normalize=True)
@abc.abstractmethod def _update_fit_params( self, train_tsdatasets: List[TSDataset], train_labels: np.ndarray, valid_tsdatasets: List[TSDataset], valid_labels: np.ndarray ) -> Dict[str, Any]: """Infer parameters by TSdataset automatically. Args: train_tsdatasets: List[TSDataset], train_labels: np.ndarray, valid_tsdatasets: List[TSDataset], valid_labels: np.ndarray Returns: Dict[str, Any]: model parameters. """ pass @abc.abstractmethod def _init_network(self) -> paddle.nn.Layer: """Setup the network. Returns: paddle.nn.Layer. """ pass
[docs] def save(self, path: str) -> None: """ Saves a PaddleBaseClassifier instance to a disk file. Args: path(str): A path string containing a model file name. Raises: ValueError """ abs_model_path = os.path.abspath(path) abs_root_path = os.path.dirname(abs_model_path) raise_if_not( os.path.exists(abs_root_path), "failed to save model, path not exists: %s" % abs_root_path ) raise_if( os.path.isdir(abs_model_path), "failed to save model, path must be a file, not directory: %s" % abs_model_path ) raise_if( os.path.exists(abs_model_path), "Failed to save model, target file already exists: %s" % abs_model_path ) raise_if(self._network is None, "failed to save model, model._network must not be None.") # raise_if(self._optimizer is None, "failed to save model, model._optimizer must not be None.") # path to save other internal files. # adding modelname as each internal file name prefix to allow multiple models to be saved at same dir. # examples (assume there are 2 models `a` and `b`): # a.modelname = "a" # a.model_meta_name = "a_model_meta" # a.network_statedict = "a_network_statedict" # b.modelname = "b" # b.model_meta_name = "b_model_meta" # b.network_statedict = "b_network_statedict" # given above example, adding name prefix avoids conflicts between a.internal files and b.internal files. modelname = os.path.basename(abs_model_path) internal_filename_map = { "model_meta": "%s_%s" % (modelname, "model_meta"), "network_statedict": "%s_%s" % (modelname, "network_statedict"), # currently ignore optimizer. # "optimizer_statedict": "%s_%s" % (modelname, "optimizer_statedict"), } # internal files must not conflict with existing files. conflict_files = {*internal_filename_map.values()} - set(os.listdir(abs_root_path)) raise_if( len(conflict_files) < len(internal_filename_map), "failed to save model internal files, these files must not exist: %s" % conflict_files ) # start to save # 1 save optimizer state dict (currently ignore optimizer logic.) # optimizer_state_dict = self._optimizer.state_dict() # try: # paddle.save( # obj=optimizer_state_dict, # path=os.path.join(abs_root_path, internal_filename_map["optimizer_statedict"]) # ) # except Exception as e: # raise_log( # ValueError( # "error occurred while saving %s: %s, err: %s" % # (internal_filename_map["optimizer_statedict"], optimizer_state_dict, str(e)) # ) # ) # 2 save network state dict network_state_dict = self._network.state_dict() try: paddle.save( obj=network_state_dict, path=os.path.join(abs_root_path, internal_filename_map["network_statedict"]) ) except Exception as e: raise_log( ValueError( "error occurred while saving %s: %s, err: %s" % (internal_filename_map["network_statedict"], network_state_dict, str(e)) ) ) # 3 save model optimizer = self._optimizer network = self._network callback_container = self._callback_container # _network is inherited from a paddle-related pickle-not-serializable object, so needs to set to None. self._network = None # _optimizer is inherited from a paddle-related pickle-not-serializable object, so needs to set to None. self._optimizer = None # _callback_container contains PaddleBaseModel instances, as PaddleBaseModel contains pickle-not-serializable # objects `_network` and `_optimizer`, so also needs to set to None. self._callback_container = None try: with open(abs_model_path, "wb") as f: pickle.dump(self, f) except Exception as e: raise_log(ValueError("error occurred while saving %s, err: %s" % (abs_model_path, str(e)))) # 4 save model meta (e.g. classname) model_meta = { # ChildModel,PaddleBaseModelImpl,PaddleBaseModel,BaseModel,Trainable,ABC,object "ancestor_classname_set": [clazz.__name__ for clazz in self.__class__.mro()], "modulename": self.__module__ } try: with open(os.path.join(abs_root_path, internal_filename_map["model_meta"]), "w") as f: json.dump(model_meta, f, ensure_ascii=False) except Exception as e: raise_log( ValueError("error occurred while saving %s, err: %s" % (internal_filename_map["model_meta"], str(e))) ) # in order to allow a model instance to be saved multiple times, set attrs back. self._optimizer = optimizer self._network = network self._callback_container = callback_container return
[docs] @staticmethod def load(path: str) -> "PaddleBaseClassifier": """ Loads a PaddleBaseClassifier from a file. Args: path(str): A path string containing a model file name. Returns: PaddleBaseClassifier: the loaded PaddleBaseClassifier instance. """ abs_path = os.path.abspath(path) raise_if_not(os.path.exists(abs_path), "model file does not exist: %s" % abs_path) raise_if(os.path.isdir(abs_path), "path must be a file path, not a directory: %s" % abs_path) # 1.1 model with open(abs_path, "rb") as f: model = pickle.load(f) raise_if_not( isinstance(model, PaddleBaseClassifier), "loaded model type must be inherited from %s, but actual loaded model type: %s" % (PaddleBaseClassifier, model.__class__) ) # 1.2 - 1.4 model._network model._network = model._init_network() raise_if(model._network is None, "model._network must not be None after calling _init_network()") modelname = os.path.basename(abs_path) network_statedict_filename = "%s_%s" % (modelname, "network_statedict") network_statedict_abs_path = os.path.join(os.path.dirname(abs_path), network_statedict_filename) network_statedict = paddle.load(network_statedict_abs_path) model._network.set_state_dict(network_statedict) # 1.5 - 1.7 model._optimizer # model._optimizer = model._init_optimizer() # raise_if(model._optimizer is None, "model._optimizer must not be None after calling _init_optimizer()") # # optimizer_statedict_filename = "%s_%s" % (modelname, "optimizer_statedict") # optimizer_statedict_abs_path = os.path.join(os.path.dirname(abs_path), optimizer_statedict_filename) # optimizer_statedict = paddle.load(optimizer_statedict_abs_path) # # model._optimizer.set_state_dict(optimizer_statedict) return model