Source code for sklearnex.linear_model.incremental_linear

# ===============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import numbers
import warnings

import numpy as np
from sklearn.base import BaseEstimator, MultiOutputMixin, RegressorMixin
from sklearn.metrics import r2_score
from sklearn.utils import check_array, gen_batches
from sklearn.utils.validation import check_is_fitted

from daal4py.sklearn._n_jobs_support import control_n_jobs
from daal4py.sklearn._utils import daal_check_version, sklearn_check_version
from onedal.linear_model import (
    IncrementalLinearRegression as onedal_IncrementalLinearRegression,
)

if sklearn_check_version("1.2"):
    from sklearn.utils._param_validation import Interval

if sklearn_check_version("1.6"):
    from sklearn.utils.validation import validate_data
else:
    validate_data = BaseEstimator._validate_data

from onedal.common.hyperparameters import get_hyperparameters

from .._device_offload import dispatch, wrap_output_data
from .._utils import IntelEstimator, PatchingConditionsChain, register_hyperparameters


[docs]@register_hyperparameters( { "fit": get_hyperparameters("linear_regression", "train"), "partial_fit": get_hyperparameters("linear_regression", "train"), } ) @control_n_jobs( decorated_methods=["fit", "partial_fit", "predict", "score", "_onedal_finalize_fit"] ) class IncrementalLinearRegression( IntelEstimator, MultiOutputMixin, RegressorMixin, BaseEstimator ): """ Trains a linear regression model, allows for computation if the data are split into batches. The user can use the ``partial_fit`` method to provide a single batch of data or use the ``fit`` method to provide the entire dataset. Parameters ---------- fit_intercept : bool, default=True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (i.e. data is expected to be centered). copy_X : bool, default=True If True, X will be copied; else, it may be overwritten. n_jobs : int, default=None The number of jobs to use for the computation. batch_size : int, default=None The number of samples to use for each batch. Only used when calling ``fit``. If ``batch_size`` is ``None``, then ``batch_size`` is inferred from the data and set to ``5 * n_features``. Attributes ---------- coef_ : array of shape (n_features, ) or (n_targets, n_features) Estimated coefficients for the linear regression problem. If multiple targets are passed during the fit (y 2D), this is a 2D array of shape (n_targets, n_features), while if only one target is passed, this is a 1D array of length n_features. intercept_ : float or array of shape (n_targets,) Independent term in the linear model. Set to 0.0 if `fit_intercept = False`. n_samples_seen_ : int The number of samples processed by the estimator. Will be reset on new calls to ``fit``, but increments across ``partial_fit`` calls. It should be not less than `n_features_in_` if `fit_intercept` is False and not less than `n_features_in_` + 1 if `fit_intercept` is True to obtain regression coefficients. batch_size_ : int Inferred batch size from ``batch_size``. n_features_in_ : int Number of features seen during ``fit`` or ``partial_fit``. Note ---- Serializing instances of this class will trigger a forced finalization of calculations. Since finalize_fit can't be dispatched without directly provided queue and the dispatching policy can't be serialized, the computation is finalized during serialization call and the policy is not saved in serialized data. Examples -------- >>> import numpy as np >>> from sklearnex.linear_model import IncrementalLinearRegression >>> inclr = IncrementalLinearRegression(batch_size=2) >>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 10]]) >>> y = np.array([1.5, 3.5, 5.5, 8.5]) >>> inclr.partial_fit(X[:2], y[:2]) >>> inclr.partial_fit(X[2:], y[2:]) >>> inclr.coef_ np.array([0.5., 0.5.]) >>> inclr.intercept_ np.array(0.) >>> inclr.fit(X) >>> inclr.coef_ np.array([0.5., 0.5.]) >>> inclr.intercept_ np.array(0.) """ _onedal_incremental_linear = staticmethod(onedal_IncrementalLinearRegression) if sklearn_check_version("1.2"): _parameter_constraints: dict = { "fit_intercept": ["boolean"], "copy_X": ["boolean"], "n_jobs": [Interval(numbers.Integral, -1, None, closed="left"), None], "batch_size": [Interval(numbers.Integral, 1, None, closed="left"), None], } def __init__(self, *, fit_intercept=True, copy_X=True, n_jobs=None, batch_size=None): self.fit_intercept = fit_intercept self.copy_X = copy_X self.n_jobs = n_jobs self.batch_size = batch_size def _onedal_supported(self, method_name, *data): patching_status = PatchingConditionsChain( f"sklearn.linear_model.{self.__class__.__name__}.{method_name}" ) return patching_status _onedal_cpu_supported = _onedal_supported _onedal_gpu_supported = _onedal_supported def _onedal_predict(self, X, queue=None): if sklearn_check_version("1.2"): self._validate_params() if sklearn_check_version("1.0"): X = validate_data( self, X, dtype=[np.float64, np.float32], copy=self.copy_X, reset=False, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy_X, ) assert hasattr(self, "_onedal_estimator") if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.predict(X, queue=queue) def _onedal_score(self, X, y, sample_weight=None, queue=None): return r2_score( y, self._onedal_predict(X, queue=queue), sample_weight=sample_weight ) def _onedal_partial_fit(self, X, y, check_input=True, queue=None): first_pass = not hasattr(self, "n_samples_seen_") or self.n_samples_seen_ == 0 if sklearn_check_version("1.2"): self._validate_params() if check_input: if sklearn_check_version("1.0"): X, y = validate_data( self, X, y, dtype=[np.float64, np.float32], reset=first_pass, copy=self.copy_X, multi_output=True, force_all_finite=False, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy_X, force_all_finite=False, ) y = check_array( y, dtype=[np.float64, np.float32], copy=False, ensure_2d=False, force_all_finite=False, ) if first_pass: self.n_samples_seen_ = X.shape[0] self.n_features_in_ = X.shape[1] else: self.n_samples_seen_ += X.shape[0] onedal_params = {"fit_intercept": self.fit_intercept, "copy_X": self.copy_X} if not hasattr(self, "_onedal_estimator"): self._onedal_estimator = self._onedal_incremental_linear(**onedal_params) self._onedal_estimator.partial_fit(X, y, queue=queue) self._need_to_finalize = True if daal_check_version((2025, "P", 200)): def _onedal_validate_underdetermined(self, n_samples, n_features): pass else: def _onedal_validate_underdetermined(self, n_samples, n_features): is_underdetermined = n_samples < n_features + int(self.fit_intercept) if is_underdetermined: raise ValueError("Not enough samples for oneDAL") def _onedal_finalize_fit(self, queue=None): assert hasattr(self, "_onedal_estimator") self._onedal_validate_underdetermined(self.n_samples_seen_, self.n_features_in_) self._onedal_estimator.finalize_fit(queue=queue) self._need_to_finalize = False def _onedal_fit(self, X, y, queue=None): if sklearn_check_version("1.2"): self._validate_params() if sklearn_check_version("1.0"): X, y = validate_data( self, X, y, dtype=[np.float64, np.float32], copy=self.copy_X, multi_output=True, ensure_2d=True, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy_X, ) y = check_array( y, dtype=[np.float64, np.float32], copy=False, ensure_2d=False, ) n_samples, n_features = X.shape self._onedal_validate_underdetermined(n_samples, n_features) if self.batch_size is None: self.batch_size_ = 5 * n_features else: self.batch_size_ = self.batch_size self.n_samples_seen_ = 0 if hasattr(self, "_onedal_estimator"): self._onedal_estimator._reset() for batch in gen_batches(n_samples, self.batch_size_): X_batch, y_batch = X[batch], y[batch] self._onedal_partial_fit(X_batch, y_batch, check_input=False, queue=queue) if sklearn_check_version("1.2"): self._validate_params() # finite check occurs on onedal side self.n_features_in_ = n_features if n_samples == 1: warnings.warn( "Only one sample available. You may want to reshape your data array" ) self._onedal_finalize_fit(queue=queue) return self @property def intercept_(self): if hasattr(self, "_onedal_estimator"): if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.intercept_ else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute 'intercept_'" ) @intercept_.setter def intercept_(self, value): self.__dict__["intercept_"] = value if hasattr(self, "_onedal_estimator"): self._onedal_estimator.intercept_ = value del self._onedal_estimator._onedal_model @property def coef_(self): if hasattr(self, "_onedal_estimator"): if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.coef_ else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute 'coef_'" ) @coef_.setter def coef_(self, value): self.__dict__["coef_"] = value if hasattr(self, "_onedal_estimator"): self._onedal_estimator.coef_ = value del self._onedal_estimator._onedal_model
[docs] def partial_fit(self, X, y, check_input=True): """ Incremental fit linear model with X and y. All of X and y is processed as a single batch. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where ``n_samples`` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) or (n_samples, n_targets) Target values, where ``n_samples`` is the number of samples and ``n_targets`` is the number of targets. Returns ------- self : object Returns the instance itself. """ dispatch( self, "partial_fit", { "onedal": self.__class__._onedal_partial_fit, "sklearn": None, }, X, y, check_input=check_input, ) return self
[docs] def fit(self, X, y): """ Fit the model with X and y, using minibatches of size ``batch_size``. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. It is necessary for ``n_samples`` to be not less than ``n_features`` if ``fit_intercept`` is False and not less than ``n_features + 1`` if ``fit_intercept`` is True y : array-like of shape (n_samples,) or (n_samples, n_targets) Target values, where ``n_samples`` is the number of samples and ``n_targets`` is the number of targets. Returns ------- self : object Returns the instance itself. """ dispatch( self, "fit", { "onedal": self.__class__._onedal_fit, "sklearn": None, }, X, y, ) return self
[docs] @wrap_output_data def predict(self, X, y=None): """ Predict using the linear model. Parameters ---------- X : array-like or sparse matrix, shape (n_samples, n_features) Samples. y : Ignored Not used, present for API consistency by convention. Returns ------- C : array, shape (n_samples, n_targets) Returns predicted values. """ check_is_fitted(self) return dispatch( self, "predict", { "onedal": self.__class__._onedal_predict, "sklearn": None, }, X, )
@wrap_output_data def score(self, X, y, sample_weight=None): """Return the coefficient of determination of the prediction. The coefficient of determination :math:`R^2` is defined as :math:`(1 - \\frac{u}{v})`, where :math:`u` is the residual sum of squares ``((y_true - y_pred)** 2).sum()`` and :math:`v` is the total sum of squares ``((y_true - y_true.mean()) ** 2).sum()``. The best possible score is 1.0 and it can be negative (because the model can be arbitrarily worse). A constant model that always predicts the expected value of `y`, disregarding the input features, would get a :math:`R^2` score of 0.0. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. sample_weight : array-like of shape (n_samples,), default=None Sample weights. Returns ------- score : float :math:`R^2` of ``self.predict(X)`` w.r.t. `y`. Notes ----- The :math:`R^2` score used when calling ``score`` on a regressor uses ``multioutput='uniform_average'`` from version 0.23 to keep consistent with default value of :func:`~sklearn.metrics.r2_score`. This influences the ``score`` method of all the multioutput regressors (except for :class:`~sklearn.multioutput.MultiOutputRegressor`). """ check_is_fitted(self) return dispatch( self, "score", { "onedal": self.__class__._onedal_score, "sklearn": None, }, X, y, sample_weight=sample_weight, )