# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import numpy as np
from ... import opcodes as OperandDef
from ...config import options
from ...operands import OperandStage
from ...lib import sparse
from ...lib.sparse.core import get_array_module as get_sparse_array_module
from ...serialize import BoolField, Int32Field, Int64Field
from ...tiles import TilesError
from ...utils import get_shuffle_input_keys_idxes, check_chunks_unknown_shape
from ..operands import TensorMapReduceOperand, TensorOperandMixin, TensorShuffleProxy
from ..array_utils import as_same_device, device
from ..core import TensorOrder
from ..utils import validate_axis, hash_on_axis
class TensorUnique(TensorMapReduceOperand, TensorOperandMixin):
_op_type_ = OperandDef.UNIQUE
_return_index = BoolField('return_index')
_return_inverse = BoolField('return_inverse')
_return_counts = BoolField('return_counts')
_axis = Int32Field('axis')
_aggregate_size = Int32Field('aggregate_size')
_aggregate_id = Int32Field('aggregate_id')
_start_pos = Int64Field('start_pos')
def __init__(self, return_index=None, return_inverse=None, return_counts=None,
axis=None, start_pos=None, stage=None, shuffle_key=None,
dtype=None, gpu=None, aggregate_id=None, aggregate_size=None, **kw):
super().__init__(_return_index=return_index, _return_inverse=return_inverse,
_return_counts=return_counts, _axis=axis, _start_pos=start_pos,
_aggregate_id=aggregate_id, _aggregate_size=aggregate_size,
_stage=stage, _shuffle_key=shuffle_key, _dtype=dtype, _gpu=gpu, **kw)
@property
def output_limit(self):
if self.stage == OperandStage.map:
return 1
return 1 + bool(self._return_index) + \
bool(self._return_inverse) + bool(self._return_counts)
@property
def return_index(self):
return self._return_index
@property
def return_inverse(self):
return self._return_inverse
@property
def return_counts(self):
return self._return_counts
@property
def axis(self):
return self._axis
@property
def aggregate_size(self):
return self._aggregate_size
@property
def aggregate_id(self):
return self._aggregate_id
@property
def start_pos(self):
return self._start_pos
@classmethod
def _gen_kws(cls, op, input_obj, chunk=False, chunk_index=None):
kws = []
# unique tensor
shape = list(input_obj.shape)
shape[op.axis] = np.nan
kw = {'shape': tuple(shape),
'dtype': input_obj.dtype,
'gpu': input_obj.op.gpu}
if chunk:
idx = [0, ] * len(shape)
idx[op.axis] = chunk_index or 0
kw['index'] = tuple(idx)
kws.append(kw)
# unique indices tensor
if op.return_index:
kw = {'shape': (np.nan,),
'dtype': np.dtype(np.intp),
'gpu': input_obj.op.gpu,
'type': 'indices'}
if chunk:
kw['index'] = (chunk_index or 0,)
kws.append(kw)
# unique inverse tensor
if op.return_inverse:
kw = {'shape': (input_obj.shape[op.axis],),
'dtype': np.dtype(np.intp),
'gpu': input_obj.op.gpu,
'type': 'inverse'}
if chunk:
kw['index'] = (chunk_index or 0,)
kws.append(kw)
# unique counts tensor
if op.return_counts:
kw = {'shape': (np.nan,),
'dtype': np.dtype(np.int_),
'gpu': input_obj.op.gpu,
'type': 'counts'}
if chunk:
kw['index'] = (chunk_index or 0,)
kws.append(kw)
return kws
def __call__(self, ar):
from .atleast_1d import atleast_1d
ar = atleast_1d(ar)
if self.axis is None:
if ar.ndim > 1:
ar = ar.flatten()
self._axis = 0
else:
self._axis = validate_axis(ar.ndim, self._axis)
kws = self._gen_kws(self, ar)
tensors = self.new_tensors([ar], kws=kws, order=TensorOrder.C_ORDER)
if len(tensors) == 1:
return tensors[0]
return tensors
@classmethod
def _tile_one_chunk(cls, op):
outs = op.outputs
ins = op.inputs
chunk_op = op.copy().reset_key()
in_chunk = ins[0].chunks[0]
kws = cls._gen_kws(chunk_op, in_chunk, chunk=True)
out_chunks = chunk_op.new_chunks([in_chunk], kws=kws, order=outs[0].order)
new_op = op.copy()
kws = [out.params.copy() for out in outs]
for kw, out_chunk in zip(kws, out_chunks):
kw['chunks'] = [out_chunk]
kw['nsplits'] = tuple((s,) for s in out_chunk.shape)
return new_op.new_tensors(ins, kws=kws, order=outs[0].order)
@classmethod
def _tile_via_shuffle(cls, op):
# rechunk the axes except the axis to do unique into 1 chunk
inp = op.inputs[0]
if inp.ndim > 1:
new_chunk_size = dict()
for axis in range(inp.ndim):
if axis == op.axis:
continue
if np.isnan(inp.shape[axis]):
raise TilesError(f'input tensor has unknown shape on axis {axis}')
new_chunk_size[axis] = inp.shape[axis]
check_chunks_unknown_shape([inp], TilesError)
inp = inp.rechunk(new_chunk_size)._inplace_tile()
aggregate_size = op.aggregate_size
if aggregate_size is None:
aggregate_size = max(inp.chunk_shape[op.axis] // options.combine_size, 1)
unique_on_chunk_sizes = inp.nsplits[op.axis]
start_poses = np.cumsum((0,) + unique_on_chunk_sizes).tolist()[:-1]
map_chunks = []
for c in inp.chunks:
map_op = TensorUnique(stage=OperandStage.map,
return_index=op.return_index,
return_inverse=op.return_inverse,
return_counts=op.return_counts,
axis=op.axis, aggregate_size=aggregate_size,
start_pos=start_poses[c.index[op.axis]],
dtype=inp.dtype)
shape = list(c.shape)
shape[op.axis] = np.nan
map_chunks.append(map_op.new_chunk([c], shape=tuple(shape), index=c.index))
shuffle_chunk = TensorShuffleProxy(dtype=inp.dtype, _tensor_keys=[inp.op.key]) \
.new_chunk(map_chunks, shape=())
reduce_chunks = [list() for _ in range(len(op.outputs))]
for i in range(aggregate_size):
reduce_op = TensorUnique(stage=OperandStage.reduce,
return_index=op.return_index,
return_inverse=op.return_inverse,
return_counts=op.return_counts,
axis=op.axis, aggregate_id=i,
shuffle_key=str(i))
kws = cls._gen_kws(op, inp, chunk=True, chunk_index=i)
chunks = reduce_op.new_chunks([shuffle_chunk], kws=kws,
order=op.outputs[0].order)
for j, c in enumerate(chunks):
reduce_chunks[j].append(c)
if op.return_inverse:
inverse_pos = 2 if op.return_index else 1
map_inverse_chunks = reduce_chunks[inverse_pos]
inverse_shuffle_chunk = TensorShuffleProxy(
dtype=map_inverse_chunks[0].dtype).new_chunk(map_inverse_chunks, shape=())
inverse_chunks = []
for j, cs in enumerate(unique_on_chunk_sizes):
chunk_op = TensorUniqueInverseReduce(dtype=map_inverse_chunks[0].dtype,
shuffle_key=str(j))
inverse_chunk = chunk_op.new_chunk([inverse_shuffle_chunk], shape=(cs,),
index=(j,))
inverse_chunks.append(inverse_chunk)
reduce_chunks[inverse_pos] = inverse_chunks
kws = [out.params for out in op.outputs]
for kw, chunks in zip(kws, reduce_chunks):
kw['chunks'] = chunks
unique_nsplits = list(inp.nsplits)
unique_nsplits[op.axis] = (np.nan,) * len(reduce_chunks[0])
kws[0]['nsplits'] = tuple(unique_nsplits)
i = 1
if op.return_index:
kws[i]['nsplits'] = ((np.nan,) * len(reduce_chunks[i]),)
i += 1
if op.return_inverse:
kws[i]['nsplits'] = (inp.nsplits[op.axis],)
i += 1
if op.return_counts:
kws[i]['nsplits'] = ((np.nan,) * len(reduce_chunks[i]),)
new_op = op.copy()
return new_op.new_tensors(op.inputs, kws=kws)
@classmethod
def tile(cls, op):
if len(op.inputs[0].chunks) == 1:
return cls._tile_one_chunk(op)
else:
return cls._tile_via_shuffle(op)
@classmethod
def _execute_map(cls, ctx, op):
(ar,), device_id, xp = as_same_device(
[ctx[c.key] for c in op.inputs], device=op.device, ret_extra=True)
n_reducer = op.aggregate_size
with device(device_id):
results = xp.unique(ar, return_index=op.return_index,
return_inverse=op.return_inverse,
return_counts=op.return_counts,
axis=op.axis)
results = (results,) if not isinstance(results, tuple) else results
results_iter = iter(results)
unique_ar = next(results_iter)
indices_ar = next(results_iter) + op.start_pos if op.return_index else None
inverse_ar = next(results_iter) if op.return_inverse else None
counts_ar = next(results_iter) if op.return_counts else None
if xp is sparse:
dense_xp = get_sparse_array_module(unique_ar)
else:
dense_xp = xp
unique_index = dense_xp.arange(unique_ar.shape[op.axis]) \
if inverse_ar is not None else None
if unique_ar.size > 0:
unique_reducers = dense_xp.asarray(
hash_on_axis(unique_ar, op.axis, n_reducer))
else:
unique_reducers = dense_xp.empty_like(unique_ar)
ind_ar = dense_xp.arange(ar.shape[op.axis])
for reducer in range(n_reducer):
res = []
cond = unique_reducers == reducer
# unique
slc = (slice(None),) * op.axis + (cond,)
res.append(unique_ar[slc])
# indices
if indices_ar is not None:
res.append(indices_ar[cond])
# inverse
if inverse_ar is not None:
index_selected = unique_index[cond]
inv_cond = xp.isin(inverse_ar, index_selected)
inv_selected = xp.searchsorted(index_selected, inverse_ar[inv_cond])
ind_selected = ind_ar[inv_cond]
res.append(xp.stack([ind_selected, inv_selected]))
# counts
if counts_ar is not None:
res.append(counts_ar[cond])
ctx[(op.outputs[0].key, str(reducer))] = tuple(res)
@classmethod
def _execute_reduce(cls, ctx, op):
in_chunk = op.inputs[0]
input_keys, input_indexes = get_shuffle_input_keys_idxes(in_chunk)
inputs = list(zip(*(ctx[(input_key, str(op.aggregate_id))]
for input_key in input_keys)))
flatten, device_id, xp = as_same_device(
list(itertools.chain(*inputs)), device=op.device, ret_extra=True)
n_ret = len(inputs[0])
inputs = [flatten[i * n_ret: (i + 1) * n_ret] for i in range(len(inputs))]
inputs_iter = iter(inputs)
unique_arrays = next(inputs_iter)
indices_arrays = next(inputs_iter) if op.return_index else None
inverse_arrays = next(inputs_iter) if op.return_inverse else None
counts_arrays = next(inputs_iter) if op.return_counts else None
with device(device_id):
ar = xp.concatenate(unique_arrays, axis=op.axis)
result_return_inverse = op.return_inverse or op.return_counts
axis = op.axis
if ar.size == 0 or ar.shape[axis] == 0:
# empty array on the axis
results = [xp.empty(ar.shape)]
i = 1
for it in (op.return_index, op.return_inverse, op.return_counts):
if it:
results.append(xp.empty([], dtype=op.outputs[i].dtype))
i += 1
results = tuple(results)
else:
results = xp.unique(ar, return_index=op.return_index,
return_inverse=result_return_inverse,
axis=axis)
results = (results,) if not isinstance(results, tuple) else results
results_iter = iter(results)
outputs_iter = iter(op.outputs)
# unique array
ctx[next(outputs_iter).key] = next(results_iter)
if op.output_limit == 1:
return
# calc indices
if op.return_index:
ctx[next(outputs_iter).key] = \
xp.concatenate(indices_arrays)[next(results_iter)]
# calc inverse
try:
inverse_result = next(results_iter)
if op.return_inverse:
unique_sizes = tuple(ua.shape[op.axis] for ua in unique_arrays)
cum_unique_sizes = np.cumsum((0,) + unique_sizes)
indices_out_key = next(outputs_iter).key
for i, inverse_array in enumerate(inverse_arrays):
p = inverse_result[cum_unique_sizes[i]: cum_unique_sizes[i + 1]]
r = xp.empty(inverse_array.shape, dtype=inverse_array.dtype)
if inverse_array.size > 0:
r[0] = inverse_array[0]
r[1] = p[inverse_array[1]]
# return unique length and
ctx[(indices_out_key, str(input_indexes[i][op.axis]))] = \
results[0].shape[op.axis], r
# calc counts
if op.return_counts:
result_counts = xp.zeros(results[0].shape[op.axis], dtype=int)
t = np.stack([inverse_result, np.concatenate(counts_arrays)])
def acc(a):
i, v = a
result_counts[i] += v
np.apply_along_axis(acc, 0, t)
ctx[next(outputs_iter).key] = xp.asarray(result_counts)
except StopIteration:
pass
@classmethod
def execute(cls, ctx, op):
if op.stage == OperandStage.map:
cls._execute_map(ctx, op)
elif op.stage == OperandStage.reduce:
cls._execute_reduce(ctx, op)
else:
(ar,), device_id, xp = as_same_device(
[ctx[c.key] for c in op.inputs], device=op.device, ret_extra=True)
with device(device_id):
kw = dict(return_index=op.return_index,
return_inverse=op.return_inverse,
return_counts=op.return_counts)
if ar.dtype != object:
# axis cannot pass when dtype is object
kw['axis'] = op.axis
results = xp.unique(ar, **kw)
outs = op.outputs
if len(outs) == 1:
ctx[outs[0].key] = results
return
assert len(outs) == len(results)
for out, result in zip(outs, results):
ctx[out.key] = result
class TensorUniqueInverseReduce(TensorMapReduceOperand, TensorOperandMixin):
_op_type_ = OperandDef.UNIQUE_INVERSE_REDUCE
def __init__(self, shuffle_key=None, dtype=None, gpu=None, **kw):
super().__init__(_stage=OperandStage.reduce, _shuffle_key=shuffle_key,
_dtype=dtype, _gpu=gpu, **kw)
@classmethod
def execute(cls, ctx, op):
out = op.outputs[0]
input_keys, _ = get_shuffle_input_keys_idxes(op.inputs[0])
inputs = [ctx[(inp_key, op.shuffle_key)] for inp_key in input_keys]
unique_sizes = [inp[0] for inp in inputs]
cum_unique_sizes = np.cumsum([0] + unique_sizes)
invs, device_id, xp = as_same_device([inp[1] for inp in inputs],
device=op.device, ret_extra=True)
with device(device_id):
ret = xp.empty(out.shape, dtype=out.dtype)
for i, inv in enumerate(invs):
ret[inv[0]] = cum_unique_sizes[i] + inv[1]
ctx[out.key] = ret
[docs]def unique(ar, return_index=False, return_inverse=False, return_counts=False,
axis=None, aggregate_size=None):
"""
Find the unique elements of a tensor.
Returns the sorted unique elements of a tensor. There are three optional
outputs in addition to the unique elements:
* the indices of the input tensor that give the unique values
* the indices of the unique tensor that reconstruct the input tensor
* the number of times each unique value comes up in the input tensor
Parameters
----------
ar : array_like
Input tensor. Unless `axis` is specified, this will be flattened if it
is not already 1-D.
return_index : bool, optional
If True, also return the indices of `ar` (along the specified axis,
if provided, or in the flattened tensor) that result in the unique tensor.
return_inverse : bool, optional
If True, also return the indices of the unique tensor (for the specified
axis, if provided) that can be used to reconstruct `ar`.
return_counts : bool, optional
If True, also return the number of times each unique item appears
in `ar`.
axis : int or None, optional
The axis to operate on. If None, `ar` will be flattened. If an integer,
the subarrays indexed by the given axis will be flattened and treated
as the elements of a 1-D tensor with the dimension of the given axis,
see the notes for more details. Object tensors or structured tensors
that contain objects are not supported if the `axis` kwarg is used. The
default is None.
aggregate_size: int or None, optional
How many chunks will be after unique, default as #input.chunks / options.combine_size
Returns
-------
unique : Tensor
The sorted unique values.
unique_indices : Tensor, optional
The indices of the first occurrences of the unique values in the
original tensor. Only provided if `return_index` is True.
unique_inverse : Tensor, optional
The indices to reconstruct the original tensor from the
unique tensor. Only provided if `return_inverse` is True.
unique_counts : Tensor, optional
The number of times each of the unique values comes up in the
original tensor. Only provided if `return_counts` is True.
Examples
--------
>>> import mars.tensor as mt
>>> mt.unique([1, 1, 2, 2, 3, 3]).execute()
array([1, 2, 3])
>>> a = mt.array([[1, 1], [2, 3]])
>>> mt.unique(a).execute()
array([1, 2, 3])
Return the unique rows of a 2D tensor
>>> a = mt.array([[1, 0, 0], [1, 0, 0], [2, 3, 4]])
>>> mt.unique(a, axis=0).execute()
array([[1, 0, 0], [2, 3, 4]])
Return the indices of the original tensor that give the unique values:
>>> a = mt.array(['a', 'b', 'b', 'c', 'a'])
>>> u, indices = mt.unique(a, return_index=True)
>>> u.execute()
array(['a', 'b', 'c'],
dtype='|S1')
>>> indices.execute()
array([0, 1, 3])
>>> a[indices].execute()
array(['a', 'b', 'c'],
dtype='|S1')
Reconstruct the input array from the unique values:
>>> a = mt.array([1, 2, 6, 4, 2, 3, 2])
>>> u, indices = mt.unique(a, return_inverse=True)
>>> u.execute()
array([1, 2, 3, 4, 6])
>>> indices.execute()
array([0, 1, 4, 3, 1, 2, 1])
>>> u[indices].execute()
array([1, 2, 6, 4, 2, 3, 2])
"""
op = TensorUnique(return_index=return_index, return_inverse=return_inverse,
return_counts=return_counts, axis=axis,
aggregate_size=aggregate_size)
return op(ar)