Source code for mars.learn.utils.checks

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np

try:
    from sklearn import get_config as get_sklearn_config
except ImportError:  # pragma: no cover
    get_sklearn_config = None

from ... import opcodes as OperandDef
from ... import tensor as mt
from ...core import ENTITY_TYPE, get_output_types, recursive_tile
from ...core.operand import OperandStage
from ...config import options
from ...serialization.serializables import (
    KeyField,
    StringField,
    BoolField,
    DataTypeField,
)
from ...tensor.core import TensorOrder, TENSOR_CHUNK_TYPE
from ...tensor.array_utils import as_same_device, device, issparse, get_array_module
from ...utils import ceildiv
from ..operands import LearnOperand, LearnOperandMixin, OutputType


class CheckBase(LearnOperand, LearnOperandMixin):
    _input = KeyField("input")
    _value = KeyField("value")
    _err_msg = StringField("err_msg")

    def __init__(self, input=None, value=None, err_msg=None, output_types=None, **kw):
        super().__init__(
            _input=input,
            _value=value,
            _err_msg=err_msg,
            _output_types=output_types,
            **kw,
        )

    @property
    def input(self):
        return self._input

    @property
    def value(self):
        return self._value

    @property
    def err_msg(self):
        return self._err_msg

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        if self._input is not None:
            self._input = self._inputs[0]
        if self._value is not None:
            self._value = self._inputs[-1]

    def __call__(self, x, value=None):
        # output input if value not specified
        self._value = value = value if value is not None else x
        self.output_types = get_output_types(value)
        self.stage = OperandStage.agg
        return self.new_tileable([x, value], kws=[value.params])

    @classmethod
    def tile(cls, op):
        combine_size = options.combine_size
        x, value = op.input, op.value
        check_chunks = []
        for i, chunk in enumerate(x.chunks):
            chunk_op = cls(
                err_msg=op.err_msg,
                stage=OperandStage.map,
                output_types=[OutputType.tensor],
            )
            check_chunk = chunk_op.new_chunk(
                [chunk],
                shape=(),
                index=(i,),
                dtype=np.dtype(bool),
                order=TensorOrder.C_ORDER,
            )
            check_chunks.append(check_chunk)

        while len(check_chunks) > 1:
            prev_check_chunks = check_chunks
            check_chunks = []
            chunk_size = ceildiv(len(prev_check_chunks), combine_size)
            for i in range(chunk_size):
                chunks = prev_check_chunks[i * combine_size : (i + 1) * combine_size]
                chunk_op = cls(
                    err_msg=op.err_msg,
                    stage=OperandStage.combine,
                    output_types=[OutputType.tensor],
                )
                check_chunk = chunk_op.new_chunk(
                    chunks,
                    shape=(),
                    index=(i,),
                    dtype=np.dtype(bool),
                    order=TensorOrder.C_ORDER,
                )
                check_chunks.append(check_chunk)

        check_chunk = check_chunks[0]
        out_chunks = []
        for val_chunk in value.chunks:
            chunk_op = cls(
                value=val_chunk,
                err_msg=op.err_msg,
                stage=OperandStage.agg,
                output_types=op.output_types,
            )
            out_chunk = chunk_op.new_chunk(
                [check_chunk, val_chunk], kws=[val_chunk.params]
            )
            out_chunks.append(out_chunk)

        new_op = op.copy()
        kw = op.outputs[0].params
        kw["chunks"] = out_chunks
        kw["nsplits"] = value.nsplits
        return new_op.new_tileables(op.inputs, kws=[kw])


class CheckNonNegative(CheckBase):
    _op_type_ = OperandDef.CHECK_NON_NEGATIVE

    _whom = StringField("whom")

    def __init__(
        self,
        input=None,
        value=None,
        whom=None,
        err_msg=None,
        stage=None,
        gpu=None,
        output_types=None,
        **kw,
    ):
        super().__init__(
            input=input,
            value=value,
            _whom=whom,
            err_msg=err_msg,
            stage=stage,
            output_types=output_types,
            gpu=gpu,
            **kw,
        )
        if self._err_msg is None and self._whom is not None:
            self._err_msg = f"Negative values in data passed to {self._whom}"

    @property
    def whom(self):
        return self._whom

    @classmethod
    def _execute_tensor(cls, ctx, op):
        (x,), device_id, xp = as_same_device(
            [ctx[inp.key] for inp in op.inputs], device=op.device, ret_extra=True
        )

        with device(device_id):
            if issparse(x) and x.nnz == 0:
                x_min = 0
            else:
                x_min = xp.min(x)

            if x_min < 0:
                raise ValueError(op.err_msg)

            ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def _execute_df(cls, ctx, op):
        x = ctx[op.inputs[0].key]
        x_min = x.min().min()
        if x_min < 0:
            raise ValueError(op.err_msg)

        ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def _execute_map(cls, ctx, op):
        if isinstance(op.inputs[0], TENSOR_CHUNK_TYPE):
            return cls._execute_tensor(ctx, op)
        else:
            return cls._execute_df(ctx, op)

    @classmethod
    def _execute_combine(cls, ctx, op):
        # just pass value cuz all inputs executed successfully
        ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def _execute_agg(cls, ctx, op):
        ctx[op.outputs[0].key] = ctx[op.value.key]

    @classmethod
    def execute(cls, ctx, op):
        if op.stage == OperandStage.map:
            return cls._execute_map(ctx, op)
        elif op.stage == OperandStage.combine:
            return cls._execute_combine(ctx, op)
        else:
            assert op.stage == OperandStage.agg
            return cls._execute_agg(ctx, op)


def check_non_negative_then_return_value(to_check, value, whom):
    op = CheckNonNegative(input=to_check, value=value, whom=whom)
    return op(to_check, value)


class AssertAllFinite(LearnOperand, LearnOperandMixin):
    _op_type_ = OperandDef.ASSERT_ALL_FINITE

    _x = KeyField("x")
    _allow_nan = BoolField("allow_nan")
    _msg_dtype = DataTypeField("msg_dtype")
    _check_only = BoolField("check_only")
    # chunks
    _is_finite = KeyField("is_finite")
    _check_nan = KeyField("check_nan")

    def __init__(
        self,
        x=None,
        allow_nan=None,
        msg_dtype=None,
        check_only=None,
        is_finite=None,
        check_nan=None,
        output_types=None,
        **kw,
    ):
        super().__init__(
            _x=x,
            _allow_nan=allow_nan,
            _msg_dtype=msg_dtype,
            _check_only=check_only,
            _is_finite=is_finite,
            _check_nan=check_nan,
            _output_types=output_types,
            **kw,
        )

    @property
    def x(self):
        return self._x

    @property
    def allow_nan(self):
        return self._allow_nan

    @property
    def msg_dtype(self):
        return self._msg_dtype

    @property
    def check_only(self):
        return self._check_only

    @property
    def is_finite(self):
        return self._is_finite

    @property
    def check_nan(self):
        return self._check_nan

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        inputs_iter = iter(self._inputs)
        for attr in ("_x", "_is_finite", "_check_nan"):
            if getattr(self, attr) is not None:
                setattr(self, attr, next(inputs_iter))

    @classmethod
    def _assume_finite(cls):
        assume_finite = options.learn.assume_finite
        if assume_finite is None and get_sklearn_config is not None:
            # get config from scikit-learn
            assume_finite = get_sklearn_config()["assume_finite"]
        if assume_finite is None:  # pragma: no cover
            assume_finite = False

        return assume_finite

    def __call__(self, x):
        if self._assume_finite():
            # skip check
            if self._check_only:
                return
            else:
                return x

        if self._check_only:
            return self.new_tileable(
                [x], dtype=np.dtype(bool), shape=(), order=TensorOrder.C_ORDER
            )
        else:
            return self.new_tileable([x], kws=[x.params])

    @classmethod
    def tile(cls, op):
        from .extmath import _safe_accumulator_op

        x = op.x
        out = op.outputs[0]
        is_float = x.dtype.kind in "fc"
        combine_size = options.combine_size

        is_finite_chunk = check_nan_chunk = None
        if is_float:
            is_finite_chunk = (
                yield from recursive_tile(mt.isfinite(_safe_accumulator_op(mt.sum, x)))
            ).chunks[0]
        elif x.dtype == np.dtype(object) and not op.allow_nan:
            check_nan_chunk = (yield from recursive_tile((x != x).any())).chunks[0]

        map_chunks = []
        for c in x.chunks:
            chunk_op = op.copy().reset_key()
            chunk_op.stage = OperandStage.map
            chunk_op._is_finite = is_finite_chunk
            chunk_op._check_nan = check_nan_chunk
            chunk_inputs = [c]
            if is_finite_chunk is not None:
                chunk_inputs.append(is_finite_chunk)
            if check_nan_chunk is not None:
                chunk_inputs.append(check_nan_chunk)
            chunk_params = c.params
            if op.check_only:
                chunk_params["dtype"] = np.dtype(bool)
                chunk_params["shape"] = ()
                if len(x.chunks) == 1:
                    chunk_params["index"] = ()
            map_chunk = chunk_op.new_chunk(chunk_inputs, kws=[chunk_params])
            map_chunks.append(map_chunk)

        new_op = op.copy()
        if not op.check_only:
            params = out.params
            params["nsplits"] = x.nsplits
            params["chunks"] = map_chunks
            return new_op.new_tileables(op.inputs, kws=[params])

        out_chunks = map_chunks
        # if check only, we use tree reduction to aggregate to one chunk
        while len(out_chunks) > 1:
            size = ceildiv(len(out_chunks), combine_size)
            new_out_chunks = []
            for i in range(size):
                chunk_op = AssertAllFinite(
                    check_only=True,
                    output_types=op.output_types,
                    stage=OperandStage.combine if size > 1 else OperandStage.agg,
                )
                chunk_index = (i,) if size > 1 else ()
                out_chunk = chunk_op.new_chunk(
                    out_chunks[combine_size * i : combine_size * (i + 1)],
                    dtype=out.dtype,
                    shape=(),
                    index=chunk_index,
                    order=out.order,
                )
                new_out_chunks.append(out_chunk)
            out_chunks = new_out_chunks

        params = out.params
        params["nsplits"] = ()
        params["chunks"] = out_chunks
        return new_op.new_tileables(op.inputs, kws=[params])

    @classmethod
    def _execute_map(cls, ctx, op):
        allow_nan = op.allow_nan
        msg_dtype = op.msg_dtype
        raw = x = ctx[op.x.key]
        xp = get_array_module(x, nosparse=True)

        if issparse(x):
            x = x.data
        # First try an O(n) time, O(1) space solution for the common case that
        # everything is finite; fall back to O(n) space np.isfinite to prevent
        # false positives from overflow in sum method. The sum is also calculated
        # safely to reduce dtype induced overflows.
        is_float = x.dtype.kind in "fc"
        if is_float and ctx[op.is_finite.key]:
            pass
        elif is_float:
            msg_err = "Input contains {} or a value too large for {!r}."
            if (
                allow_nan
                and xp.isinf(x).any()
                or not allow_nan
                and not xp.isfinite(x).all()
            ):
                type_err = "infinity" if allow_nan else "NaN, infinity"
                raise ValueError(
                    msg_err.format(
                        type_err, msg_dtype if msg_dtype is not None else x.dtype
                    )
                )
        # for object dtype data, we only check for NaNs
        elif x.dtype == np.dtype(object) and not allow_nan:
            if ctx[op.check_nan.key]:
                raise ValueError("Input contains NaN")

        if op.check_only:
            result = np.array(True)
        else:
            result = raw
        ctx[op.outputs[0].key] = result

    @classmethod
    def _execute_combine_reduce(cls, ctx, op):
        # just return True
        ctx[op.outputs[0].key] = np.array(True)

    @classmethod
    def execute(cls, ctx, op):
        if op.stage == OperandStage.map:
            return cls._execute_map(ctx, op)
        else:
            assert op.stage in (OperandStage.combine, OperandStage.agg)
            return cls._execute_combine_reduce(ctx, op)


[docs]def assert_all_finite(X, allow_nan=False, msg_dtype=None, check_only=True): if not isinstance(X, ENTITY_TYPE): X = mt.asarray(X) if ( isinstance(X.op, AssertAllFinite) and X.op.allow_nan == allow_nan and X.op.msg_dtype == msg_dtype and X.op.check_only == check_only ): return X if check_only: output_types = [OutputType.tensor] sparse = False else: output_types = get_output_types(X) sparse = X.issparse() op = AssertAllFinite( x=X, allow_nan=allow_nan, msg_dtype=msg_dtype, check_only=check_only, sparse=sparse, output_types=output_types, ) return op(X)