# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from typing import Any, List, Tuple, Type
import numpy as np
from ... import opcodes as OperandDef
from ...core import TILEABLE_TYPE
from ...core.operand import OperandStage
from ...serialization.serializables import StringField, AnyField, Int64Field, Int32Field
from ...typing import TileableType, ChunkType
from ...config import options
from ...utils import has_unknown_shape
from ..operands import TensorOperand, TensorOperandMixin
from ..core import TENSOR_TYPE, TensorOrder
from ..datasource.array import tensor as astensor
from ..array_utils import as_same_device, device
class TensorSearchsorted(TensorOperand, TensorOperandMixin):
_op_type_ = OperandDef.SEARCHSORTED
v = AnyField("v")
side = StringField("side")
combine_size = Int32Field("combine_size")
# for chunk
offset = Int64Field("offset")
size = Int64Field("size")
n_chunk = Int64Field("n_chunk")
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
if isinstance(self.v, TILEABLE_TYPE):
self.v = self._inputs[1]
def __call__(self, a, v):
inputs = [a]
if isinstance(v, TILEABLE_TYPE):
inputs.append(v)
shape = v.shape
else:
shape = ()
return self.new_tensor(inputs, shape=shape, order=TensorOrder.C_ORDER)
@classmethod
def _tile_one_chunk(cls, op, a, v, out):
chunks = []
if len(op.inputs) == 1:
v_chunks = [v]
else:
v_chunks = v.chunks
for v_chunk in v_chunks:
chunk_op = op.copy().reset_key()
in_chunks = [a.chunks[0]]
if len(op.inputs) == 2:
in_chunks.append(v_chunk)
v_shape = v_chunk.shape if hasattr(v_chunk, "shape") else ()
chunk_idx = v_chunk.index if len(op.inputs) == 2 else (0,)
chunk = chunk_op.new_chunk(
in_chunks, shape=v_shape, index=chunk_idx, order=out.order
)
chunks.append(chunk)
new_op = op.copy().reset_key()
nsplits = ((s,) for s in out.shape) if len(op.inputs) == 1 else v.nsplits
return new_op.new_tensors(op.inputs, out.shape, chunks=chunks, nsplits=nsplits)
@classmethod
def _combine_chunks(
cls,
to_combine: List[ChunkType],
op_type: Type,
v: Any,
stage: OperandStage,
chunk_index: Tuple[int],
):
from ..merge import TensorStack
dtype = np.dtype(np.intp)
v_shape = v.shape if hasattr(v, "shape") else ()
combine_op = TensorStack(axis=0, dtype=dtype)
combine_chunk = combine_op.new_chunk(to_combine, shape=v_shape)
chunk_op = op_type(dtype=dtype, axis=(0,), stage=stage)
return chunk_op.new_chunk(
[combine_chunk], shape=v_shape, index=chunk_index, order=TensorOrder.C_ORDER
)
@classmethod
def _tile_tree_reduction(
cls, op: "TensorSearchsorted", a: TileableType, v: Any, out: TileableType
):
from ..indexing import TensorSlice
from ..merge import TensorConcatenate
from ..reduction import TensorMax, TensorMin
if has_unknown_shape(a):
yield
combine_size = op.combine_size or options.combine_size
n_chunk = len(a.chunks)
input_len = len(op.inputs)
v_chunks = [v] if input_len == 1 else v.chunks
cum_nsplits = [0] + np.cumsum(a.nsplits[0]).tolist()
input_chunks = []
offsets = []
for i in range(n_chunk):
offset = cum_nsplits[i]
cur_chunk = a.chunks[i]
chunk_size = a.shape[0]
chunks = []
if i > 0:
last_chunk = a.chunks[i - 1]
if last_chunk.shape[0] > 0:
slice_chunk_op = TensorSlice(
slices=[slice(-1, None)], dtype=cur_chunk.dtype
)
slice_chunk = slice_chunk_op.new_chunk(
[last_chunk], shape=(1,), order=out.order
)
chunks.append(slice_chunk)
chunk_size += 1
offset -= 1
chunks.append(cur_chunk)
if i < n_chunk - 1:
next_chunk = a.chunks[i + 1]
if next_chunk.shape[0] > 0:
slice_chunk_op = TensorSlice(
slices=[slice(1)], dtype=cur_chunk.dtype
)
slice_chunk = slice_chunk_op.new_chunk(
[next_chunk], shape=(1,), order=out.order
)
chunks.append(slice_chunk)
chunk_size += 1
concat_op = TensorConcatenate(dtype=cur_chunk.dtype)
concat_chunk = concat_op.new_chunk(
chunks, shape=(chunk_size,), order=out.order, index=cur_chunk.index
)
input_chunks.append(concat_chunk)
offsets.append(offset)
out_chunks = []
for v_chunk in v_chunks:
chunks = []
v_shape = v_chunk.shape if hasattr(v_chunk, "shape") else ()
v_index = v_chunk.index if hasattr(v_chunk, "index") else (0,)
for inp_chunk, offset in zip(input_chunks, offsets):
chunk_op = op.copy().reset_key()
chunk_op.stage = OperandStage.map
chunk_op.offset = offset
chunk_op.n_chunk = n_chunk
chunk_op.size = a.shape[0]
chunk_inputs = [inp_chunk]
if input_len > 1:
chunk_inputs.append(v_chunk)
map_chunk = chunk_op.new_chunk(
chunk_inputs, shape=v_shape, index=inp_chunk.index, order=out.order
)
chunks.append(map_chunk)
op_type = TensorMax if op.side == "right" else TensorMin
while len(chunks) > combine_size:
new_chunks = []
it = itertools.count(0)
while True:
j = next(it)
to_combine = chunks[j * combine_size : (j + 1) * combine_size]
if len(to_combine) == 0:
break
new_chunks.append(
cls._combine_chunks(
to_combine, op_type, v_chunk, OperandStage.combine, (j,)
)
)
chunks = new_chunks
chunk = cls._combine_chunks(
chunks, op_type, v_chunk, OperandStage.agg, v_index
)
out_chunks.append(chunk)
new_op = op.copy().reset_key()
nsplits = ((s,) for s in out.shape) if len(op.inputs) == 1 else v.nsplits
return new_op.new_tensors(
op.inputs, out.shape, chunks=out_chunks, nsplits=nsplits
)
@classmethod
def tile(cls, op):
a = op.inputs[0]
out = op.outputs[0]
input_len = len(op.inputs)
if input_len == 1:
v = op.v
else:
v = op.inputs[1]
if len(a.chunks) == 1:
return cls._tile_one_chunk(op, a, v, out)
return (yield from cls._tile_tree_reduction(op, a, v, out))
@classmethod
def _execute_without_stage(cls, xp, a, v, op):
return xp.searchsorted(a, v, side=op.side)
@classmethod
def _execute_map(cls, xp: Any, a: np.ndarray, v: Any, op: "TensorSearchsorted"):
out = op.outputs[0]
i = out.index[0]
side = op.side
raw_v = v
v = xp.atleast_1d(v)
searched = xp.searchsorted(a, v, side=op.side)
xp.add(searched, op.offset, out=searched)
a_min, a_max = a[0], a[-1]
if i == 0:
# the first chunk
if a_min == a_max:
miss = v > a_max
else:
miss = v > a_max if side == "left" else v >= a_max
elif i == op.n_chunk - 1:
# the last chunk
if a_min == a_max:
miss = v < a_min
else:
miss = v <= a_min if side == "left" else v < a_min
else:
if side == "left" and a_min < a_max:
miss = (v <= a_min) | (v > a_max)
elif a_min < a_max:
miss = (v < a_min) | (v >= a_max)
else:
assert a_min == a_max
miss = v != a_min
if side == "right":
searched[miss] = -1
else:
searched[miss] = op.size + 1
return searched[0] if np.isscalar(raw_v) else searched
@classmethod
def execute(cls, ctx, op):
a = ctx[op.inputs[0].key]
v = ctx[op.inputs[1].key] if len(op.inputs) == 2 else op.v
data = []
if isinstance(a, tuple):
data.extend(a)
else:
data.append(a)
if len(op.inputs) == 2:
data.append(v)
data, device_id, xp = as_same_device(data, device=op.device, ret_extra=True)
if isinstance(a, tuple):
a = data[:2]
else:
a = data[0]
if len(op.inputs) == 2:
v = data[-1]
with device(device_id):
if op.stage is None:
ret = cls._execute_without_stage(xp, a, v, op)
else:
assert op.stage == OperandStage.map
ret = cls._execute_map(xp, a, v, op)
ctx[op.outputs[0].key] = ret
[docs]def searchsorted(a, v, side="left", sorter=None, combine_size=None):
"""
Find indices where elements should be inserted to maintain order.
Find the indices into a sorted tensor `a` such that, if the
corresponding elements in `v` were inserted before the indices, the
order of `a` would be preserved.
Assuming that `a` is sorted:
====== ============================
`side` returned index `i` satisfies
====== ============================
left ``a[i-1] < v <= a[i]``
right ``a[i-1] <= v < a[i]``
====== ============================
Parameters
----------
a : 1-D array_like
Input tensor. If `sorter` is None, then it must be sorted in
ascending order, otherwise `sorter` must be an array of indices
that sort it.
v : array_like
Values to insert into `a`.
side : {'left', 'right'}, optional
If 'left', the index of the first suitable location found is given.
If 'right', return the last such index. If there is no suitable
index, return either 0 or N (where N is the length of `a`).
sorter : 1-D array_like, optional
Optional tensor of integer indices that sort array a into ascending
order. They are typically the result of argsort.
combine_size: int, optional
The number of chunks to combine.
Returns
-------
indices : tensor of ints
Array of insertion points with the same shape as `v`.
See Also
--------
sort : Return a sorted copy of a tensor.
histogram : Produce histogram from 1-D data.
Notes
-----
Binary search is used to find the required insertion points.
This function is a faster version of the builtin python `bisect.bisect_left`
(``side='left'``) and `bisect.bisect_right` (``side='right'``) functions,
which is also vectorized in the `v` argument.
Examples
--------
>>> import mars.tensor as mt
>>> mt.searchsorted([1,2,3,4,5], 3).execute()
2
>>> mt.searchsorted([1,2,3,4,5], 3, side='right').execute()
3
>>> mt.searchsorted([1,2,3,4,5], [-10, 10, 2, 3]).execute()
array([0, 5, 1, 2])
"""
if (
not isinstance(a, TENSOR_TYPE)
and sorter is not None
and not isinstance(sorter, TENSOR_TYPE)
):
a = astensor(np.asarray(a)[sorter])
else:
a = astensor(a)
if sorter is not None:
a = a[sorter]
if a.ndim != 1:
raise ValueError("`a` should be 1-d tensor")
if a.issparse():
# does not support sparse tensor
raise ValueError("`a` should be a dense tensor")
if side not in {"left", "right"}:
raise ValueError(f"'{side}' is an invalid value for keyword 'side'")
if not np.isscalar(v):
v = astensor(v)
op = TensorSearchsorted(
v=v, side=side, dtype=np.dtype(np.intp), combine_size=combine_size
)
return op(a, v)