Source code for sklearnex.covariance.incremental_covariance

# ===============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

import numbers
import warnings

import numpy as np
from scipy import linalg
from sklearn.base import BaseEstimator, clone
from sklearn.covariance import EmpiricalCovariance as _sklearn_EmpiricalCovariance
from sklearn.covariance import log_likelihood
from sklearn.utils import check_array, gen_batches
from sklearn.utils.validation import _num_features, check_is_fitted

from daal4py.sklearn._n_jobs_support import control_n_jobs
from daal4py.sklearn._utils import daal_check_version, sklearn_check_version
from onedal.covariance import (
    IncrementalEmpiricalCovariance as onedal_IncrementalEmpiricalCovariance,
)
from sklearnex import config_context

from .._device_offload import dispatch, wrap_output_data
from .._utils import IntelEstimator, PatchingConditionsChain, register_hyperparameters
from ..metrics import pairwise_distances
from ..utils._array_api import get_namespace

if sklearn_check_version("1.2"):
    from sklearn.utils._param_validation import Interval

if sklearn_check_version("1.6"):
    from sklearn.utils.validation import validate_data
else:
    validate_data = BaseEstimator._validate_data


[docs] @control_n_jobs(decorated_methods=["partial_fit", "fit", "_onedal_finalize_fit"]) class IncrementalEmpiricalCovariance(IntelEstimator, BaseEstimator): """ Maximum likelihood covariance estimator that allows for the estimation when the data are split into batches. The user can use the ``partial_fit`` method to provide a single batch of data or use the ``fit`` method to provide the entire dataset. Parameters ---------- store_precision : bool, default=False Specifies if the estimated precision is stored. assume_centered : bool, default=False If True, data are not centered before computation. Useful when working with data whose mean is almost, but not exactly zero. If False (default), data are centered before computation. batch_size : int, default=None The number of samples to use for each batch. Only used when calling ``fit``. If ``batch_size`` is ``None``, then ``batch_size`` is inferred from the data and set to ``5 * n_features``, to provide a balance between approximation accuracy and memory consumption. copy : bool, default=True If False, X will be overwritten. ``copy=False`` can be used to save memory but is unsafe for general use. Attributes ---------- location_ : ndarray of shape (n_features,) Estimated location, i.e. the estimated mean. covariance_ : ndarray of shape (n_features, n_features) Estimated covariance matrix n_samples_seen_ : int The number of samples processed by the estimator. Will be reset on new calls to ``fit``, but increments across ``partial_fit`` calls. batch_size_ : int Inferred batch size from ``batch_size``. n_features_in_ : int Number of features seen during ``fit`` or ``partial_fit``. Examples -------- >>> import numpy as np >>> from sklearnex.covariance import IncrementalEmpiricalCovariance >>> inccov = IncrementalEmpiricalCovariance(batch_size=1) >>> X = np.array([[1, 2], [3, 4]]) >>> inccov.partial_fit(X[:1]) >>> inccov.partial_fit(X[1:]) >>> inccov.covariance_ np.array([[1., 1.],[1., 1.]]) >>> inccov.location_ np.array([2., 3.]) >>> inccov.fit(X) >>> inccov.covariance_ np.array([[1., 1.],[1., 1.]]) >>> inccov.location_ np.array([2., 3.]) """ _onedal_incremental_covariance = staticmethod(onedal_IncrementalEmpiricalCovariance) if sklearn_check_version("1.2"): _parameter_constraints: dict = { "store_precision": ["boolean"], "assume_centered": ["boolean"], "batch_size": [Interval(numbers.Integral, 1, None, closed="left"), None], "copy": ["boolean"], } get_precision = _sklearn_EmpiricalCovariance.get_precision error_norm = wrap_output_data(_sklearn_EmpiricalCovariance.error_norm) def __init__( self, *, store_precision=False, assume_centered=False, batch_size=None, copy=True ): self.assume_centered = assume_centered self.store_precision = store_precision self.batch_size = batch_size self.copy = copy def _onedal_supported(self, method_name, *data): patching_status = PatchingConditionsChain( f"sklearn.covariance.{self.__class__.__name__}.{method_name}" ) return patching_status def _onedal_finalize_fit(self, queue=None): assert hasattr(self, "_onedal_estimator") self._onedal_estimator.finalize_fit(queue=queue) self._need_to_finalize = False if not daal_check_version((2024, "P", 400)) and self.assume_centered: location = self._onedal_estimator.location_[None, :] self._onedal_estimator.covariance_ += np.dot(location.T, location) self._onedal_estimator.location_ = np.zeros_like(np.squeeze(location)) if self.store_precision: self.precision_ = linalg.pinvh( self._onedal_estimator.covariance_, check_finite=False ) else: self.precision_ = None @property def covariance_(self): if hasattr(self, "_onedal_estimator"): if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.covariance_ else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute 'covariance_'" ) @property def location_(self): if hasattr(self, "_onedal_estimator"): if self._need_to_finalize: self._onedal_finalize_fit() return self._onedal_estimator.location_ else: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute 'location_'" ) def _onedal_partial_fit(self, X, queue=None, check_input=True): first_pass = not hasattr(self, "n_samples_seen_") or self.n_samples_seen_ == 0 # finite check occurs on onedal side if check_input: if sklearn_check_version("1.2"): self._validate_params() if sklearn_check_version("1.0"): X = validate_data( self, X, dtype=[np.float64, np.float32], reset=first_pass, copy=self.copy, force_all_finite=False, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy, force_all_finite=False, ) onedal_params = { "method": "dense", "bias": True, "assume_centered": self.assume_centered, } if not hasattr(self, "_onedal_estimator"): self._onedal_estimator = self._onedal_incremental_covariance(**onedal_params) try: if first_pass: self.n_samples_seen_ = X.shape[0] self.n_features_in_ = X.shape[1] else: self.n_samples_seen_ += X.shape[0] self._onedal_estimator.partial_fit(X, queue=queue) finally: self._need_to_finalize = True return self @wrap_output_data def score(self, X_test, y=None): xp, _ = get_namespace(X_test) check_is_fitted(self) location = self.location_ if sklearn_check_version("1.0"): X = validate_data( self, X_test, dtype=[np.float64, np.float32], reset=False, ) else: X = check_array( X_test, dtype=[np.float64, np.float32], ) if "numpy" not in xp.__name__: location = xp.asarray(location, device=X_test.device) # depending on the sklearn version, check_array # and validate_data will return only numpy arrays # which will break dpnp/dpctl support. If the # array namespace isn't from numpy and the data # is now a numpy array, it has been validated and # the original can be used. if isinstance(X, np.ndarray): X = X_test est = clone(self) est.set_params(**{"assume_centered": True}) # test_cov is a numpy array, but calculated on device test_cov = est.fit(X - location).covariance_ res = log_likelihood(test_cov, self.get_precision()) return res
[docs] def partial_fit(self, X, y=None, check_input=True): """ Incremental fit with X. All of X is processed as a single batch. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where `n_samples` is the number of samples and `n_features` is the number of features. y : Ignored Not used, present for API consistency by convention. check_input : bool, default=True Run check_array on X. Returns ------- self : object Returns the instance itself. """ return dispatch( self, "partial_fit", { "onedal": self.__class__._onedal_partial_fit, "sklearn": None, }, X, check_input=check_input, )
[docs] def fit(self, X, y=None): """ Fit the model with X, using minibatches of size batch_size. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where `n_samples` is the number of samples and `n_features` is the number of features. y : Ignored Not used, present for API consistency by convention. Returns ------- self : object Returns the instance itself. """ return dispatch( self, "fit", { "onedal": self.__class__._onedal_fit, "sklearn": None, }, X, )
def _onedal_fit(self, X, queue=None): self.n_samples_seen_ = 0 if hasattr(self, "_onedal_estimator"): self._onedal_estimator._reset() if sklearn_check_version("1.2"): self._validate_params() # finite check occurs on onedal side if sklearn_check_version("1.0"): X = validate_data( self, X, dtype=[np.float64, np.float32], copy=self.copy, force_all_finite=False, ) else: X = check_array( X, dtype=[np.float64, np.float32], copy=self.copy, force_all_finite=False ) self.n_features_in_ = X.shape[1] self.batch_size_ = self.batch_size if self.batch_size else 5 * self.n_features_in_ if X.shape[0] == 1: warnings.warn( "Only one sample available. You may want to reshape your data array" ) for batch in gen_batches(X.shape[0], self.batch_size_): X_batch = X[batch] self._onedal_partial_fit(X_batch, queue=queue, check_input=False) self._onedal_finalize_fit(queue=queue) return self # expose sklearnex pairwise_distances if mahalanobis distance eventually supported def mahalanobis(self, X): if sklearn_check_version("1.0"): self._check_feature_names(X, reset=False) xp, _ = get_namespace(X) precision = self.get_precision() # compute mahalanobis distances # pairwise_distances will check n_features (via n_feature matching with # self.location_) , and will check for finiteness via check array # check_feature_names will match _validate_data functionally location = self.location_[np.newaxis, :] if "numpy" not in xp.__name__: # Guarantee that inputs to pairwise_distances match in type and location location = xp.asarray(location, device=X.device) try: dist = pairwise_distances(X, location, metric="mahalanobis", VI=precision) except ValueError as e: # Throw the expected sklearn error in an n_feature length violation if "Incompatible dimension for X and Y matrices: X.shape[1] ==" in str(e): raise ValueError( f"X has {_num_features(X)} features, but {self.__class__.__name__} " f"is expecting {self.n_features_in_} features as input." ) else: raise e return (xp.reshape(dist, (-1,))) ** 2 _onedal_cpu_supported = _onedal_supported _onedal_gpu_supported = _onedal_supported mahalanobis.__doc__ = _sklearn_EmpiricalCovariance.mahalanobis.__doc__ error_norm.__doc__ = _sklearn_EmpiricalCovariance.error_norm.__doc__ score.__doc__ = _sklearn_EmpiricalCovariance.score.__doc__