Source code for mars.learn.wrappers

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Union

import numpy as np
from sklearn.base import (
    MetaEstimatorMixin,
    BaseEstimator as SklearnBaseEstimator,
    RegressorMixin as SklearnRegressorMixin,
    ClassifierMixin as SklearnClassifierMixin,
)

from .. import remote as mr
from .. import tensor as mt
from .base import BaseEstimator, RegressorMixin, ClassifierMixin
from .metrics import get_scorer
from .utils import copy_learned_attributes, check_array


def _wrap(estimator: SklearnBaseEstimator, method, X, y, **kwargs):
    return getattr(estimator, method)(X, y, **kwargs)


[docs]class ParallelPostFit(BaseEstimator, MetaEstimatorMixin): """ Meta-estimator for parallel predict and transform. Parameters ---------- estimator : Estimator The underlying estimator that is fit. scoring : string or callable, optional A single string (see :ref:`scoring_parameter`) or a callable (see :ref:`scoring`) to evaluate the predictions on the test set. For evaluating multiple metrics, either give a list of (unique) strings or a dict with names as keys and callables as values. NOTE that when using custom scorers, each scorer should return a single value. Metric functions returning a list/array of values can be wrapped into multiple scorers that return one value each. See :ref:`multimetric_grid_search` for an example. .. warning:: If None, the estimator's default scorer (if available) is used. Most scikit-learn estimators will convert large Mars tensors to a single NumPy array, which may exhaust the memory of your worker. You probably want to always specify `scoring`. Notes ----- .. warning:: This class is not appropriate for parallel or distributed *training* on large datasets. For that, see :class:`Incremental`, which provides distributed (but sequential) training. If you're doing distributed hyperparameter optimization on larger-than-memory datasets, see :class:`mars.learn.model_selection.IncrementalSearch`. This estimator does not parallelize the training step. This simply calls the underlying estimators's ``fit`` method called and copies over the learned attributes to ``self`` afterwards. It is helpful for situations where your training dataset is relatively small (fits on a single machine) but you need to predict or transform a much larger dataset. ``predict``, ``predict_proba`` and ``transform`` will be done in parallel (potentially distributed if you've connected to a Mars cluster). Note that many scikit-learn estimators already predict and transform in parallel. This meta-estimator may still be useful in those cases when your dataset is larger than memory, as the distributed scheduler will ensure the data isn't all read into memory at once. See Also -------- Incremental mars.learn.model_selection.IncrementalSearch Examples -------- >>> from sklearn.ensemble import GradientBoostingClassifier >>> from sklearn.datasets import make_classification >>> import mars.tensor as mt >>> from mars.learn.wrappers import ParallelPostFit Make a small 1,000 sample 2 training dataset and fit normally. >>> X, y = make_classification(n_samples=1000, random_state=0) >>> clf = ParallelPostFit(estimator=GradientBoostingClassifier(), ... scoring='accuracy') >>> clf.fit(X, y) ParallelPostFit(estimator=GradientBoostingClassifier(...)) >>> clf.classes_ array([0, 1]) Transform and predict return Mars outputs for Mars inputs. >>> X_big, y_big = make_classification(n_samples=100000, random_state=0) >>> X_big, y_big = mt.tensor(X_big), mt.tensor(y_big) >>> clf.predict(X_big) array([1, 0, 0, ..., 1, 0, 0]) Which can be computed in parallel. >>> clf.predict_proba(X_big) array([[0.01780031, 0.98219969], [0.62199242, 0.37800758], [0.89059934, 0.10940066], ..., [0.03249968, 0.96750032], [0.951434 , 0.048566 ], [0.99527114, 0.00472886]]) """
[docs] def __init__( self, estimator: SklearnBaseEstimator = None, scoring: Union[str, Callable] = None, ): self.estimator = estimator self.scoring = scoring
def _make_fit(self, method): def _fit(X, y=None, **kwargs): result = ( mr.spawn( _wrap, args=(self.estimator, method, X, y), kwargs=kwargs, resolve_tileable_input=True, ) .execute() .fetch() ) copy_learned_attributes(result, self) copy_learned_attributes(result, self.estimator) return self return _fit def fit(self, X, y=None, **kwargs): """ Fit the underlying estimator. Parameters ---------- X, y : array-like **kwargs Additional fit-kwargs for the underlying estimator. Returns ------- self : object """ return self._make_fit("fit")(X, y=y, **kwargs) def partial_fit(self, X, y=None, **kwargs): # pragma: no cover return self._make_fit("partial_fit")(X, y=y, **kwargs) def _check_method(self, method): """ Check if self.estimator has 'method'. Raises ------ AttributeError """ estimator = self.estimator if not hasattr(estimator, method): msg = "The wrapped estimator '{}' does not have a '{}' method.".format( estimator, method ) raise AttributeError(msg) return getattr(estimator, method) def transform(self, X): """ Transform block or partition-wise for Mars inputs. For Mars inputs, a Mars tensor is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. If the underlying estimator does not have a ``transform`` method, then an ``AttributeError`` is raised. Parameters ---------- X : array-like Returns ------- transformed : array-like """ self._check_method("transform") X = check_array(X) dtype = self.estimator.transform(np.zeros((1, X.shape[1]), dtype=X.dtype)).dtype return X.map_chunk(self.estimator.transform, dtype=dtype) def score(self, X, y): """ Returns the score on the given data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Input data, where n_samples is the number of samples and n_features is the number of features. y : array-like, shape = [n_samples] or [n_samples, n_output], optional Target relative to X for classification or regression; None for unsupervised learning. Returns ------- score : float return self.estimator.score(X, y) """ scoring = self.scoring X = check_array(X) y = check_array(y, ensure_2d=False) if not scoring: if type(self.estimator).score in ( RegressorMixin.score, SklearnRegressorMixin.score, ): # pragma: no cover scoring = "r2" elif type(self.estimator).score in ( ClassifierMixin.score, SklearnClassifierMixin.score, ): scoring = "accuracy" else: # pragma: no cover scoring = self.scoring if scoring: scorer = get_scorer(scoring) return scorer(self, X, y).execute() else: # pragma: no cover return mr.spawn(self.estimator.score, args=(X, y)).execute().fetch() def predict(self, X, execute=True): """ Predict for X. For Mars inputs, a Mars tensor is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. Parameters ---------- X : array-like Returns ------- y : array-like """ self._check_method("predict") X = check_array(X) result = X.map_chunk(self.estimator.predict, dtype="int", shape=X.shape[:1]) if execute: result.execute() return result def predict_proba(self, X, execute=True): """ Probability estimates. For Mars inputs, a Mars tensor is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. If the underlying estimator does not have a ``predict_proba`` method, then an ``AttributeError`` is raised. Parameters ---------- X : array or dataframe Returns ------- y : array-like """ self._check_method("predict_proba") X = check_array(X) result = X.map_chunk( self.estimator.predict_proba, dtype="float", shape=(X.shape[0], len(self.estimator.classes_)), ) if execute: result.execute() return result def predict_log_proba(self, X, execute=True): """ Log of probability estimates. For Mars inputs, a Mars tensor is returned. For other inputs (NumPy array, pandas dataframe, scipy sparse matrix), the regular return value is returned. If the underlying estimator does not have a ``predict_proba`` method, then an ``AttributeError`` is raised. Parameters ---------- X : array or dataframe Returns ------- y : array-like """ self._check_method("predict_log_proba") result = mt.log(self.predict_proba(X, execute=False)) if execute: result.execute() return result