Source code for sklearnex.linear_model.incremental_linear

# ===============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import numbers
import warnings

import numpy as np
from sklearn.base import BaseEstimator, MultiOutputMixin, RegressorMixin
from sklearn.metrics import r2_score
from sklearn.utils import check_array, gen_batches
from sklearn.utils.validation import check_is_fitted

from daal4py.sklearn._n_jobs_support import control_n_jobs
from daal4py.sklearn._utils import sklearn_check_version
from onedal.linear_model import (
    IncrementalLinearRegression as onedal_IncrementalLinearRegression,
)

if sklearn_check_version("1.2"):
    from sklearn.utils._param_validation import Interval

if sklearn_check_version("1.6"):
    from sklearn.utils.validation import validate_data
else:
    validate_data = BaseEstimator._validate_data

from onedal.common.hyperparameters import get_hyperparameters

from .._device_offload import dispatch, wrap_output_data
from .._utils import IntelEstimator, PatchingConditionsChain, register_hyperparameters


[docs] @register_hyperparameters( { "fit": get_hyperparameters("linear_regression", "train"), "partial_fit": get_hyperparameters("linear_regression", "train"), } ) @control_n_jobs( decorated_methods=["fit", "partial_fit", "predict", "score", "_onedal_finalize_fit"] ) class IncrementalLinearRegression( IntelEstimator, MultiOutputMixin, RegressorMixin, BaseEstimator ): """ Trains a linear regression model, allows for computation if the data are split into batches. The user can use the ``partial_fit`` method to provide a single batch of data or use the ``fit`` method to provide the entire dataset. Parameters ---------- fit_intercept : bool, default=True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (i.e. data is expected to be centered). copy_X : bool, default=True If True, X will be copied; else, it may be overwritten. n_jobs : int, default=None The number of jobs to use for the computation. batch_size : int, default=None The number of samples to use for each batch. Only used when calling ``fit``. If ``batch_size`` is ``None``, then ``batch_size`` is inferred from the data and set to ``5 * n_features``. Attributes ---------- coef_ : array of shape (n_features, ) or (n_targets, n_features) Estimated coefficients for the linear regression problem. If multiple targets are passed during the fit (y 2D), this is a 2D array of shape (n_targets, n_features), while if only one target is passed, this is a 1D array of length n_features. intercept_ : float or array of shape (n_targets,) Independent term in the linear model. Set to 0.0 if `fit_intercept = False`. n_samples_seen_ : int The number of samples processed by the estimator. Will be reset on new calls to ``fit``, but increments across ``partial_fit`` calls. It should be not less than `n_features_in_` if `fit_intercept` is False and not less than `n_features_in_` + 1 if `fit_intercept` is True to obtain regression coefficients. batch_size_ : int Inferred batch size from ``batch_size``. n_features_in_ : int Number of features seen during ``fit`` or ``partial_fit``. Examples -------- >>> import numpy as np >>> from sklearnex.linear_model import IncrementalLinearRegression >>> inclr = IncrementalLinearRegression(batch_size=2) >>> X = np.array([[1, 2], [3, 4], [5, 6], [7, 10]]) >>> y = np.array([1.5, 3.5, 5.5, 8.5]) >>> inclr.partial_fit(X[:2], y[:2]) >>> inclr.partial_fit(X[2:], y[2:]) >>> inclr.coef_ np.array([0.5., 0.5.]) >>> inclr.intercept_ np.array(0.) >>> inclr.fit(X) >>> inclr.coef_ np.array([0.5., 0.5.]) >>> inclr.intercept_ np.array(0.) """ _onedal_incremental_linear = staticmethod(onedal_IncrementalLinearRegression) if sklearn_check_version("1.2"): _parameter_constraints: dict = { "fit_intercept": ["boolean"], "copy_X": ["boolean"], "n_jobs": [Interval(numbers.Integral, -1, None, closed="left"), None], "batch_size": [Interval(numbers.Integral, 1, None, closed="left"), None], } def __init__(self, *, fit_intercept=True, copy_X=True, n_jobs=None, batch_size=None): self.fit_intercept = fit_intercept self.copy_X = copy_X self.n_jobs = n_jobs self.batch_size = batch_size def _onedal_supported(self, method_name, *data): patching_status = PatchingConditionsChain( f"sklearn.linear_model.{self.__class__.__name__}.{method_name}" ) return patching_status _onedal_cpu_supported = _onedal_supported _onedal_gpu_supported = _onedal_supported def _onedal_predict(self, X, queue=None): if sklearn_check_version("1.2"): self._validate_params() if sklearn_check_version("1.0"): X = validate_data( self, X, dtype=[np.float64, np.float32], copy=self.copy_X, reset=False, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy_X, ) assert hasattr(self, "_onedal_estimator") if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.predict(X, queue=queue) def _onedal_score(self, X, y, sample_weight=None, queue=None): return r2_score( y, self._onedal_predict(X, queue=queue), sample_weight=sample_weight ) def _onedal_partial_fit(self, X, y, check_input=True, queue=None): first_pass = not hasattr(self, "n_samples_seen_") or self.n_samples_seen_ == 0 if sklearn_check_version("1.2"): self._validate_params() if check_input: if sklearn_check_version("1.0"): X, y = validate_data( self, X, y, dtype=[np.float64, np.float32], reset=first_pass, copy=self.copy_X, multi_output=True, force_all_finite=False, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy_X, force_all_finite=False, ) y = check_array( y, dtype=[np.float64, np.float32], copy=False, ensure_2d=False, force_all_finite=False, ) if first_pass: self.n_samples_seen_ = X.shape[0] self.n_features_in_ = X.shape[1] else: self.n_samples_seen_ += X.shape[0] onedal_params = {"fit_intercept": self.fit_intercept, "copy_X": self.copy_X} if not hasattr(self, "_onedal_estimator"): self._onedal_estimator = self._onedal_incremental_linear(**onedal_params) self._onedal_estimator.partial_fit(X, y, queue=queue) self._need_to_finalize = True def _onedal_finalize_fit(self, queue=None): assert hasattr(self, "_onedal_estimator") is_underdetermined = self.n_samples_seen_ < self.n_features_in_ + int( self.fit_intercept ) if is_underdetermined: raise ValueError("Not enough samples to finalize") self._onedal_estimator.finalize_fit(queue=queue) self._need_to_finalize = False def _onedal_fit(self, X, y, queue=None): if sklearn_check_version("1.2"): self._validate_params() if sklearn_check_version("1.0"): X, y = validate_data( self, X, y, dtype=[np.float64, np.float32], copy=self.copy_X, multi_output=True, ensure_2d=True, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy_X, ) y = check_array( y, dtype=[np.float64, np.float32], copy=False, ensure_2d=False, ) n_samples, n_features = X.shape is_underdetermined = n_samples < n_features + int(self.fit_intercept) if is_underdetermined: raise ValueError("Not enough samples to run oneDAL backend") if self.batch_size is None: self.batch_size_ = 5 * n_features else: self.batch_size_ = self.batch_size self.n_samples_seen_ = 0 if hasattr(self, "_onedal_estimator"): self._onedal_estimator._reset() for batch in gen_batches(n_samples, self.batch_size_): X_batch, y_batch = X[batch], y[batch] self._onedal_partial_fit(X_batch, y_batch, check_input=False, queue=queue) if sklearn_check_version("1.2"): self._validate_params() # finite check occurs on onedal side self.n_features_in_ = n_features if n_samples == 1: warnings.warn( "Only one sample available. You may want to reshape your data array" ) self._onedal_finalize_fit(queue=queue) return self @property def intercept_(self): if hasattr(self, "_onedal_estimator"): if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.intercept_ else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute 'intercept_'" ) @intercept_.setter def intercept_(self, value): self.__dict__["intercept_"] = value if hasattr(self, "_onedal_estimator"): self._onedal_estimator.intercept_ = value del self._onedal_estimator._onedal_model @property def coef_(self): if hasattr(self, "_onedal_estimator"): if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.coef_ else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute 'coef_'" ) @coef_.setter def coef_(self, value): self.__dict__["coef_"] = value if hasattr(self, "_onedal_estimator"): self._onedal_estimator.coef_ = value del self._onedal_estimator._onedal_model
[docs] def partial_fit(self, X, y, check_input=True): """ Incremental fit linear model with X and y. All of X and y is processed as a single batch. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where ``n_samples`` is the number of samples and `n_features` is the number of features. y : array-like of shape (n_samples,) or (n_samples, n_targets) Target values, where ``n_samples`` is the number of samples and ``n_targets`` is the number of targets. Returns ------- self : object Returns the instance itself. """ dispatch( self, "partial_fit", { "onedal": self.__class__._onedal_partial_fit, "sklearn": None, }, X, y, check_input=check_input, ) return self
[docs] def fit(self, X, y): """ Fit the model with X and y, using minibatches of size ``batch_size``. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. It is necessary for ``n_samples`` to be not less than ``n_features`` if ``fit_intercept`` is False and not less than ``n_features + 1`` if ``fit_intercept`` is True y : array-like of shape (n_samples,) or (n_samples, n_targets) Target values, where ``n_samples`` is the number of samples and ``n_targets`` is the number of targets. Returns ------- self : object Returns the instance itself. """ dispatch( self, "fit", { "onedal": self.__class__._onedal_fit, "sklearn": None, }, X, y, ) return self
[docs] @wrap_output_data def predict(self, X, y=None): """ Predict using the linear model. Parameters ---------- X : array-like or sparse matrix, shape (n_samples, n_features) Samples. y : Ignored Not used, present for API consistency by convention. Returns ------- C : array, shape (n_samples, n_targets) Returns predicted values. """ check_is_fitted(self) return dispatch( self, "predict", { "onedal": self.__class__._onedal_predict, "sklearn": None, }, X, )
@wrap_output_data def score(self, X, y, sample_weight=None): """Return the coefficient of determination of the prediction. The coefficient of determination :math:`R^2` is defined as :math:`(1 - \\frac{u}{v})`, where :math:`u` is the residual sum of squares ``((y_true - y_pred)** 2).sum()`` and :math:`v` is the total sum of squares ``((y_true - y_true.mean()) ** 2).sum()``. The best possible score is 1.0 and it can be negative (because the model can be arbitrarily worse). A constant model that always predicts the expected value of `y`, disregarding the input features, would get a :math:`R^2` score of 0.0. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. sample_weight : array-like of shape (n_samples,), default=None Sample weights. Returns ------- score : float :math:`R^2` of ``self.predict(X)`` w.r.t. `y`. Notes ----- The :math:`R^2` score used when calling ``score`` on a regressor uses ``multioutput='uniform_average'`` from version 0.23 to keep consistent with default value of :func:`~sklearn.metrics.r2_score`. This influences the ``score`` method of all the multioutput regressors (except for :class:`~sklearn.multioutput.MultiOutputRegressor`). """ check_is_fitted(self) return dispatch( self, "score", { "onedal": self.__class__._onedal_score, "sklearn": None, }, X, y, sample_weight=sample_weight, )