# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
try:
from sklearn import get_config as get_sklearn_config
except ImportError: # pragma: no cover
get_sklearn_config = None
from ... import opcodes as OperandDef
from ... import tensor as mt
from ...core import ENTITY_TYPE, get_output_types, recursive_tile
from ...core.operand import OperandStage
from ...config import options
from ...serialization.serializables import (
KeyField,
StringField,
BoolField,
DataTypeField,
)
from ...tensor.core import TensorOrder, TENSOR_CHUNK_TYPE
from ...tensor.array_utils import as_same_device, device, issparse, get_array_module
from ...utils import ceildiv
from ..operands import LearnOperand, LearnOperandMixin, OutputType
class CheckBase(LearnOperand, LearnOperandMixin):
_input = KeyField("input")
_value = KeyField("value")
_err_msg = StringField("err_msg")
def __init__(self, input=None, value=None, err_msg=None, output_types=None, **kw):
super().__init__(
_input=input,
_value=value,
_err_msg=err_msg,
_output_types=output_types,
**kw,
)
@property
def input(self):
return self._input
@property
def value(self):
return self._value
@property
def err_msg(self):
return self._err_msg
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
if self._input is not None:
self._input = self._inputs[0]
if self._value is not None:
self._value = self._inputs[-1]
def __call__(self, x, value=None):
# output input if value not specified
self._value = value = value if value is not None else x
self.output_types = get_output_types(value)
self.stage = OperandStage.agg
return self.new_tileable([x, value], kws=[value.params])
@classmethod
def tile(cls, op):
combine_size = options.combine_size
x, value = op.input, op.value
check_chunks = []
for i, chunk in enumerate(x.chunks):
chunk_op = cls(
err_msg=op.err_msg,
stage=OperandStage.map,
output_types=[OutputType.tensor],
)
check_chunk = chunk_op.new_chunk(
[chunk],
shape=(),
index=(i,),
dtype=np.dtype(bool),
order=TensorOrder.C_ORDER,
)
check_chunks.append(check_chunk)
while len(check_chunks) > 1:
prev_check_chunks = check_chunks
check_chunks = []
chunk_size = ceildiv(len(prev_check_chunks), combine_size)
for i in range(chunk_size):
chunks = prev_check_chunks[i * combine_size : (i + 1) * combine_size]
chunk_op = cls(
err_msg=op.err_msg,
stage=OperandStage.combine,
output_types=[OutputType.tensor],
)
check_chunk = chunk_op.new_chunk(
chunks,
shape=(),
index=(i,),
dtype=np.dtype(bool),
order=TensorOrder.C_ORDER,
)
check_chunks.append(check_chunk)
check_chunk = check_chunks[0]
out_chunks = []
for val_chunk in value.chunks:
chunk_op = cls(
value=val_chunk,
err_msg=op.err_msg,
stage=OperandStage.agg,
output_types=op.output_types,
)
out_chunk = chunk_op.new_chunk(
[check_chunk, val_chunk], kws=[val_chunk.params]
)
out_chunks.append(out_chunk)
new_op = op.copy()
kw = op.outputs[0].params
kw["chunks"] = out_chunks
kw["nsplits"] = value.nsplits
return new_op.new_tileables(op.inputs, kws=[kw])
class CheckNonNegative(CheckBase):
_op_type_ = OperandDef.CHECK_NON_NEGATIVE
_whom = StringField("whom")
def __init__(
self,
input=None,
value=None,
whom=None,
err_msg=None,
stage=None,
gpu=None,
output_types=None,
**kw,
):
super().__init__(
input=input,
value=value,
_whom=whom,
err_msg=err_msg,
stage=stage,
output_types=output_types,
gpu=gpu,
**kw,
)
if self._err_msg is None and self._whom is not None:
self._err_msg = f"Negative values in data passed to {self._whom}"
@property
def whom(self):
return self._whom
@classmethod
def _execute_tensor(cls, ctx, op):
(x,), device_id, xp = as_same_device(
[ctx[inp.key] for inp in op.inputs], device=op.device, ret_extra=True
)
with device(device_id):
if issparse(x) and x.nnz == 0:
x_min = 0
else:
x_min = xp.min(x)
if x_min < 0:
raise ValueError(op.err_msg)
ctx[op.outputs[0].key] = np.array(True)
@classmethod
def _execute_df(cls, ctx, op):
x = ctx[op.inputs[0].key]
x_min = x.min().min()
if x_min < 0:
raise ValueError(op.err_msg)
ctx[op.outputs[0].key] = np.array(True)
@classmethod
def _execute_map(cls, ctx, op):
if isinstance(op.inputs[0], TENSOR_CHUNK_TYPE):
return cls._execute_tensor(ctx, op)
else:
return cls._execute_df(ctx, op)
@classmethod
def _execute_combine(cls, ctx, op):
# just pass value cuz all inputs executed successfully
ctx[op.outputs[0].key] = np.array(True)
@classmethod
def _execute_agg(cls, ctx, op):
ctx[op.outputs[0].key] = ctx[op.value.key]
@classmethod
def execute(cls, ctx, op):
if op.stage == OperandStage.map:
return cls._execute_map(ctx, op)
elif op.stage == OperandStage.combine:
return cls._execute_combine(ctx, op)
else:
assert op.stage == OperandStage.agg
return cls._execute_agg(ctx, op)
def check_non_negative_then_return_value(to_check, value, whom):
op = CheckNonNegative(input=to_check, value=value, whom=whom)
return op(to_check, value)
class AssertAllFinite(LearnOperand, LearnOperandMixin):
_op_type_ = OperandDef.ASSERT_ALL_FINITE
_x = KeyField("x")
_allow_nan = BoolField("allow_nan")
_msg_dtype = DataTypeField("msg_dtype")
_check_only = BoolField("check_only")
# chunks
_is_finite = KeyField("is_finite")
_check_nan = KeyField("check_nan")
def __init__(
self,
x=None,
allow_nan=None,
msg_dtype=None,
check_only=None,
is_finite=None,
check_nan=None,
output_types=None,
**kw,
):
super().__init__(
_x=x,
_allow_nan=allow_nan,
_msg_dtype=msg_dtype,
_check_only=check_only,
_is_finite=is_finite,
_check_nan=check_nan,
_output_types=output_types,
**kw,
)
@property
def x(self):
return self._x
@property
def allow_nan(self):
return self._allow_nan
@property
def msg_dtype(self):
return self._msg_dtype
@property
def check_only(self):
return self._check_only
@property
def is_finite(self):
return self._is_finite
@property
def check_nan(self):
return self._check_nan
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs)
for attr in ("_x", "_is_finite", "_check_nan"):
if getattr(self, attr) is not None:
setattr(self, attr, next(inputs_iter))
@classmethod
def _assume_finite(cls):
assume_finite = options.learn.assume_finite
if assume_finite is None and get_sklearn_config is not None:
# get config from scikit-learn
assume_finite = get_sklearn_config()["assume_finite"]
if assume_finite is None: # pragma: no cover
assume_finite = False
return assume_finite
def __call__(self, x):
if self._assume_finite():
# skip check
if self._check_only:
return
else:
return x
if self._check_only:
return self.new_tileable(
[x], dtype=np.dtype(bool), shape=(), order=TensorOrder.C_ORDER
)
else:
return self.new_tileable([x], kws=[x.params])
@classmethod
def tile(cls, op):
from .extmath import _safe_accumulator_op
x = op.x
out = op.outputs[0]
is_float = x.dtype.kind in "fc"
combine_size = options.combine_size
is_finite_chunk = check_nan_chunk = None
if is_float:
is_finite_chunk = (
yield from recursive_tile(mt.isfinite(_safe_accumulator_op(mt.sum, x)))
).chunks[0]
elif x.dtype == np.dtype(object) and not op.allow_nan:
check_nan_chunk = (yield from recursive_tile((x != x).any())).chunks[0]
map_chunks = []
for c in x.chunks:
chunk_op = op.copy().reset_key()
chunk_op.stage = OperandStage.map
chunk_op._is_finite = is_finite_chunk
chunk_op._check_nan = check_nan_chunk
chunk_inputs = [c]
if is_finite_chunk is not None:
chunk_inputs.append(is_finite_chunk)
if check_nan_chunk is not None:
chunk_inputs.append(check_nan_chunk)
chunk_params = c.params
if op.check_only:
chunk_params["dtype"] = np.dtype(bool)
chunk_params["shape"] = ()
if len(x.chunks) == 1:
chunk_params["index"] = ()
map_chunk = chunk_op.new_chunk(chunk_inputs, kws=[chunk_params])
map_chunks.append(map_chunk)
new_op = op.copy()
if not op.check_only:
params = out.params
params["nsplits"] = x.nsplits
params["chunks"] = map_chunks
return new_op.new_tileables(op.inputs, kws=[params])
out_chunks = map_chunks
# if check only, we use tree reduction to aggregate to one chunk
while len(out_chunks) > 1:
size = ceildiv(len(out_chunks), combine_size)
new_out_chunks = []
for i in range(size):
chunk_op = AssertAllFinite(
check_only=True,
output_types=op.output_types,
stage=OperandStage.combine if size > 1 else OperandStage.agg,
)
chunk_index = (i,) if size > 1 else ()
out_chunk = chunk_op.new_chunk(
out_chunks[combine_size * i : combine_size * (i + 1)],
dtype=out.dtype,
shape=(),
index=chunk_index,
order=out.order,
)
new_out_chunks.append(out_chunk)
out_chunks = new_out_chunks
params = out.params
params["nsplits"] = ()
params["chunks"] = out_chunks
return new_op.new_tileables(op.inputs, kws=[params])
@classmethod
def _execute_map(cls, ctx, op):
allow_nan = op.allow_nan
msg_dtype = op.msg_dtype
raw = x = ctx[op.x.key]
xp = get_array_module(x, nosparse=True)
if issparse(x):
x = x.data
# First try an O(n) time, O(1) space solution for the common case that
# everything is finite; fall back to O(n) space np.isfinite to prevent
# false positives from overflow in sum method. The sum is also calculated
# safely to reduce dtype induced overflows.
is_float = x.dtype.kind in "fc"
if is_float and ctx[op.is_finite.key]:
pass
elif is_float:
msg_err = "Input contains {} or a value too large for {!r}."
if (
allow_nan
and xp.isinf(x).any()
or not allow_nan
and not xp.isfinite(x).all()
):
type_err = "infinity" if allow_nan else "NaN, infinity"
raise ValueError(
msg_err.format(
type_err, msg_dtype if msg_dtype is not None else x.dtype
)
)
# for object dtype data, we only check for NaNs
elif x.dtype == np.dtype(object) and not allow_nan:
if ctx[op.check_nan.key]:
raise ValueError("Input contains NaN")
if op.check_only:
result = np.array(True)
else:
result = raw
ctx[op.outputs[0].key] = result
@classmethod
def _execute_combine_reduce(cls, ctx, op):
# just return True
ctx[op.outputs[0].key] = np.array(True)
@classmethod
def execute(cls, ctx, op):
if op.stage == OperandStage.map:
return cls._execute_map(ctx, op)
else:
assert op.stage in (OperandStage.combine, OperandStage.agg)
return cls._execute_combine_reduce(ctx, op)
[docs]def assert_all_finite(X, allow_nan=False, msg_dtype=None, check_only=True):
if not isinstance(X, ENTITY_TYPE):
X = mt.asarray(X)
if (
isinstance(X.op, AssertAllFinite)
and X.op.allow_nan == allow_nan
and X.op.msg_dtype == msg_dtype
and X.op.check_only == check_only
):
return X
if check_only:
output_types = [OutputType.tensor]
sparse = False
else:
output_types = get_output_types(X)
sparse = X.issparse()
op = AssertAllFinite(
x=X,
allow_nan=allow_nan,
msg_dtype=msg_dtype,
check_only=check_only,
sparse=sparse,
output_types=output_types,
)
return op(X)