Source code for mars.learn.ensemble._blockwise

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import numpy as np
from sklearn.base import BaseEstimator as SklearnBaseEstimator, clone
from sklearn.utils.validation import check_is_fitted

from ... import opcodes
from ... import execute
from ... import tensor as mt
from ...core import OutputType, ENTITY_TYPE, recursive_tile
from ...core.context import Context
from ...serialization.serializables import (
    FieldTypes,
    AnyField,
    BoolField,
    DictField,
    Int64Field,
    ListField,
    StringField,
    KeyField,
)
from ...typing import SessionType
from ...tensor.core import Tensor, TensorOrder
from ...tensor.utils import decide_unify_split
from ..operands import LearnOperand, LearnOperandMixin
from ..base import BaseEstimator, ClassifierMixin, RegressorMixin
from ..utils import check_array


class BlockwiseEnsembleFit(LearnOperand, LearnOperandMixin):
    _op_type_ = opcodes.BLOCKWISE_ENSEMBLE_FIT

    x = KeyField("x")
    y = KeyField("y")
    estimator = AnyField("estimator")
    kwargs = DictField("kwargs", default_factory=dict)

    def __call__(self):
        self._output_types = [OutputType.object]
        return self.new_tileable([self.x, self.y])

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        self.x = self._inputs[0]
        self.y = self._inputs[1]

    @classmethod
    def tile(cls, op: "BlockwiseEnsembleFit"):
        X, y = op.x, op.y
        x_split = X.nsplits[0]
        y_split = y.nsplits[0]
        out = op.outputs[0]

        if any(np.isnan(s) for s in x_split + y_split) or np.isnan(
            X.shape[1]
        ):  # pragma: no cover
            yield

        if x_split != y_split or X.chunk_shape[1] > 1:
            x_split = y_split = decide_unify_split(x_split, y_split)
            X = X.rechunk({0: x_split, 1: X.shape[1]})
            y = y.rechunk({0: y_split})
            X, y = yield from recursive_tile(X, y)

        out_chunks = []
        for i, _ in enumerate(x_split):
            chunk_op = op.copy().reset_key()
            out_chunk = chunk_op.new_chunk(
                [X.cix[i, 0], y.cix[(i,)]],
                index=(i,),
            )
            out_chunks.append(out_chunk)

        params = out.params.copy()
        params["chunks"] = out_chunks
        params["nsplits"] = ((np.nan,) * len(x_split),)
        return op.copy().new_tileables(op.inputs, kws=[params])

    @classmethod
    def execute(cls, ctx: Union[dict, Context], op: "BlockwiseEnsembleFit"):
        x, y = ctx[op.inputs[0].key], ctx[op.inputs[1].key]
        estimator = clone(op.estimator)
        ctx[op.outputs[0].key] = estimator.fit(x, y, **op.kwargs)


class BlockwiseEnsemblePredict(LearnOperand, LearnOperandMixin):
    _op_type_ = opcodes.BLOCKWISE_ENSEMBLE_PREDICT

    x = KeyField("x")
    estimators = ListField("estimators", FieldTypes.key)
    voting = StringField("voting", default="hard")
    proba = BoolField("proba", default=None)
    is_classifier = BoolField("is_classifier")
    n_classes = Int64Field("n_classes")

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        self.x = self._inputs[0]
        self.estimators = self._inputs[1:]

    def __call__(self):
        self._output_types = [OutputType.tensor]
        x_len = self.x.shape[0]
        if self.is_classifier:
            shape = (x_len, self.n_classes) if self.proba else (x_len,)
            dtype = np.dtype(np.float64) if self.proba else np.dtype(np.int64)
        else:
            shape = (x_len,)
            dtype = np.dtype(np.float64)
        return self.new_tileable(
            [self.x] + self.estimators,
            shape=shape,
            dtype=dtype,
            order=TensorOrder.C_ORDER,
        )

    @classmethod
    def tile(cls, op: "BlockwiseEnsemblePredict"):
        out = op.outputs[0]
        x = op.x
        estimators = op.estimators[0]
        estimators_chunks = estimators.chunks

        out_chunks = []
        for chunk in x.chunks:
            chunk_op = op.copy().reset_key()
            if out.ndim == 2:
                chunk_shape = (chunk.shape[0], out.shape[1])
                chunk_index = (chunk.index[0], 0)
            else:
                chunk_shape = (chunk.shape[0],)
                chunk_index = (chunk.index[0],)
            out_chunk = chunk_op.new_chunk(
                [chunk] + estimators_chunks,
                shape=chunk_shape,
                dtype=out.dtype,
                order=out.order,
                index=chunk_index,
            )
            out_chunks.append(out_chunk)

        if out.ndim == 2:
            nsplits = (x.nsplits[0], (out.shape[1],))
        else:
            nsplits = (x.nsplits[0],)
        params = out.params.copy()
        params["nsplits"] = nsplits
        params["chunks"] = out_chunks
        return op.copy().new_tileables(op.inputs, kws=[params])

    @classmethod
    def execute(cls, ctx: Union[dict, Context], op: "BlockwiseEnsemblePredict"):
        x = ctx[op.inputs[0].key]
        estimators = [ctx[inp.key] for inp in op.inputs[1:]]
        if op.proba or op.voting == "soft":
            predictions = [estimator.predict_proba(x) for estimator in estimators]
        else:
            predictions = [estimator.predict(x) for estimator in estimators]

        if op.is_classifier:
            if not op.proba:
                result = cls._execute_classifier_predict(predictions, op)
            else:
                result = cls._execute_classifier_predict_proba(predictions, op)
        else:
            result = cls._execute_regressor_predict(predictions)
        ctx[op.outputs[0].key] = result

    @classmethod
    def _execute_classifier_predict(
        cls, predictions: List[np.ndarray], op: "BlockwiseEnsemblePredict"
    ):
        if op.voting == "soft":
            prob = np.average(np.stack(predictions), axis=0)
            return np.argmax(prob, axis=1)
        else:

            def vote(x: np.ndarray):
                return np.argmax(np.bincount(x))

            # hard voting
            prediction = np.vstack(predictions).T
            return np.apply_along_axis(vote, 1, prediction)

    @classmethod
    def _execute_classifier_predict_proba(
        cls, predictions: List[np.ndarray], op: "BlockwiseEnsemblePredict"
    ):
        assert op.voting == "soft"
        return np.average(np.stack(predictions), axis=0)

    @classmethod
    def _execute_regressor_predict(cls, predictions: List[np.ndarray]):
        return np.average(np.vstack(predictions), axis=0)


class BlockwiseBaseEstimator(BaseEstimator):
    def __init__(self, estimator: SklearnBaseEstimator):
        self.estimator = estimator

    def _fit(self, X, y, **kwargs):
        X = check_array(X)
        op = BlockwiseEnsembleFit(x=X, y=y, estimator=self.estimator, kwargs=kwargs)
        self.estimators_ = op()


[docs]class BlockwiseVotingClassifier(ClassifierMixin, BlockwiseBaseEstimator): """ Blockwise training and ensemble voting classifier. This classifier trains on blocks / partitions of tensors or DataFrames. A cloned version of `estimator` will be fit *independently* on each block or partition of the data. This is useful when the sub estimator only works on small in-memory data structures like a NumPy array or pandas DataFrame. Prediction is done by the *ensemble* of learned models. .. warning:: Ensure that your data are sufficiently shuffled prior to training! If the values of the various blocks / partitions of your dataset are not distributed similarly, the classifier will give poor results. Parameters ---------- estimator : Estimator voting : str, {'hard', 'soft'} (default='hard') If 'hard', uses predicted class labels for majority rule voting. Else if 'soft', predicts the class label based on the argmax of the sums of the predicted probabilities, which is recommended for an ensemble of well-calibrated classifiers. classes : list-like, optional The set of classes that `y` can take. This can also be provided as a fit param if the underlying estimator requires `classes` at fit time. Attributes ---------- estimators_ : list of classifiers The collection of fitted sub-estimators that are `estimator` fitted on each partition / block of the inputs. classes_ : array-like, shape (n_predictions,) The class labels. Examples -------- >>> import mars.tensor as mt >>> from mars.learn.ensemble import BlockwiseVotingClassifier >>> from sklearn.linear_model import RidgeClassifier >>> from sklearn.datasets import make_classification >>> X, y = make_classification(n_samples=100_000) >>> X, y = mt.tensor(X, chunk_size=10_0000), mt.tensor(y, chunk_size=10_0000) >>> subestimator = RidgeClassifier(random_state=0) >>> clf = BlockwiseVotingClassifier(subestimator) >>> clf.fit(X, y) """
[docs] def __init__( self, estimator: SklearnBaseEstimator, voting: str = "hard", classes: Union[np.ndarray, list, Tensor] = None, ): super().__init__(estimator=estimator) if voting not in ("hard", "soft"): # pragma: no cover raise ValueError("`voting` could be hard or soft") self.voting = voting self.classes = None if classes is not None: self.classes = mt.tensor(classes)
def fit( self, X, y, classes: Union[np.ndarray, list, Tensor] = None, session: SessionType = None, run_kwargs: dict = None, **kwargs, ): if not isinstance(y, ENTITY_TYPE): y = mt.tensor(y) if classes is None: classes = self.classes to_execute = [] if classes is None: classes = mt.unique(y) to_execute.append(classes) super()._fit(X, y, **kwargs) to_execute.append(self.estimators_) execute(to_execute, session=session, **(run_kwargs or dict())) self.n_classes_ = len(classes) def predict(self, X, session: SessionType = None, run_kwargs: dict = None): check_is_fitted(self, attributes=["estimators_"]) X = check_array(X) op = BlockwiseEnsemblePredict( x=X, estimators=[self.estimators_], voting=self.voting, proba=False, is_classifier=True, n_classes=self.n_classes_, ) return op().execute(session=session, **(run_kwargs or dict())) def predict_proba(self, X, session: SessionType = None, run_kwargs: dict = None): if self.voting == "hard": raise AttributeError(f'predict_proba is not available when voting="hard"') check_is_fitted(self, attributes=["estimators_"]) X = check_array(X) op = BlockwiseEnsemblePredict( x=X, estimators=[self.estimators_], voting=self.voting, proba=True, is_classifier=True, n_classes=self.n_classes_, ) return op().execute(session=session, **(run_kwargs or dict()))
[docs]class BlockwiseVotingRegressor(RegressorMixin, BlockwiseBaseEstimator): """ Blockwise training and ensemble voting regressor. This regressor trains on blocks / partitions of tensors or DataFrames. A cloned version of `estimator` will be fit *independently* on each block or partition of the data. Prediction is done by the *ensemble* of learned models. .. warning:: Ensure that your data are sufficiently shuffled prior to training! If the values of the various blocks / partitions of your dataset are not distributed similarly, the regressor will give poor results. Parameters ---------- estimator : Estimator Attributes ---------- estimators_ : list of regressors The collection of fitted sub-estimators that are `estimator` fitted on each partition / block of the inputs. Examples -------- >>> import mars.tensor as mt >>> from mars.learn.ensemble import BlockwiseVotingRegressor >>> from sklearn.linear_model import LinearRegression >>> from sklearn.datasets import make_classification >>> X, y = make_classification(n_samples=100_000) >>> X, y = mt.tensor(X, chunk_size=10_0000), mt.tensor(y, chunk_size=10_0000) >>> subestimator = LinearRegression() >>> clf = BlockwiseVotingRegressor(subestimator) >>> clf.fit(X, y) """ def fit(self, X, y, session: SessionType = None, run_kwargs: dict = None, **kwargs): if not isinstance(y, ENTITY_TYPE): y = mt.tensor(y) super()._fit(X, y, **kwargs) self.estimators_.execute(session=session, **(run_kwargs or dict())) def predict(self, X, session: SessionType = None, run_kwargs: dict = None): check_is_fitted(self, attributes=["estimators_"]) X = check_array(X) op = BlockwiseEnsemblePredict( x=X, estimators=[self.estimators_], is_classifier=False ) return op().execute(session=session, **(run_kwargs or dict()))