Source code for mars.learn.utils.shuffle

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
from collections import defaultdict
from collections.abc import Iterable
from functools import reduce

import numpy as np
import pandas as pd

from ... import opcodes as OperandDef
from ...core import get_output_types, recursive_tile
from ...core.operand import OperandStage, MapReduceOperand
from ...dataframe.utils import parse_index
from ...lib import sparse
from ...serialization.serializables import FieldTypes, TupleField, KeyField
from ...tensor.utils import (
    validate_axis,
    check_random_state,
    gen_random_seeds,
    decide_unify_split,
)
from ...tensor.array_utils import get_array_module
from ...utils import tokenize, lazy_import, has_unknown_shape
from ...core import ExecutableTuple
from ..operands import LearnOperandMixin, OutputType, LearnShuffleProxy
from ..utils import convert_to_tensor_or_dataframe


cudf = lazy_import("cudf")


def _shuffle_index_value(op, index_value, chunk_index=None):
    key = tokenize((op._values_, chunk_index, index_value.key))
    return parse_index(pd.Index([], index_value.to_pandas().dtype), key=key)


def _safe_slice(obj, slc, output_type):
    if output_type == OutputType.tensor:
        return obj[slc]
    else:
        return obj.iloc[slc]


class LearnShuffle(MapReduceOperand, LearnOperandMixin):
    _op_type_ = OperandDef.PERMUTATION

    _axes = TupleField("axes", FieldTypes.int32)
    _seeds = TupleField("seeds", FieldTypes.uint32)

    _input = KeyField("input")
    _reduce_sizes = TupleField("reduce_sizes", FieldTypes.uint32)

    def __init__(
        self, axes=None, seeds=None, output_types=None, reduce_sizes=None, **kw
    ):
        super().__init__(
            _axes=axes,
            _seeds=seeds,
            _output_types=output_types,
            _reduce_sizes=reduce_sizes,
            **kw,
        )

    @property
    def axes(self):
        return self._axes

    @property
    def seeds(self):
        return self._seeds

    @property
    def input(self):
        return self._input

    @property
    def reduce_sizes(self):
        return self._reduce_sizes

    @property
    def output_limit(self):
        if self.stage is None:
            return len(self.output_types)
        return 1

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        self._input = self._inputs[0]

    def __call__(self, arrays):
        params = self._calc_params([ar.params for ar in arrays])
        return self.new_tileables(arrays, kws=params)

    def _shuffle_index_value(self, index_value):
        return _shuffle_index_value(self, index_value)

    def _shuffle_dtypes(self, dtypes):
        seed = self.seeds[self.axes.index(1)]
        rs = np.random.RandomState(seed)
        shuffled_dtypes = dtypes[rs.permutation(np.arange(len(dtypes)))]
        return shuffled_dtypes

    def _calc_params(self, params):
        axes = set(self.axes)
        for i, output_type, param in zip(itertools.count(0), self.output_types, params):
            if output_type == OutputType.dataframe:
                if 0 in axes:
                    param["index_value"] = self._shuffle_index_value(
                        param["index_value"]
                    )
                if 1 in axes:
                    dtypes = param["dtypes"] = self._shuffle_dtypes(param["dtypes"])
                    param["columns_value"] = parse_index(dtypes.index, store_data=True)
            elif output_type == OutputType.series:
                if 0 in axes:
                    param["index_value"] = self._shuffle_index_value(
                        param["index_value"]
                    )
            param["_position_"] = i
        return params

    @staticmethod
    def _safe_rechunk(tileable, ax_nsplit):
        do_rechunk = False
        for ax, nsplit in ax_nsplit.items():
            if ax >= tileable.ndim:
                continue
            if tuple(tileable.nsplits[ax]) != tuple(nsplit):
                do_rechunk = True
        if do_rechunk:
            return (yield from recursive_tile(tileable.rechunk(ax_nsplit)))
        else:
            return tileable

    @classmethod
    def _calc_chunk_params(
        cls,
        in_chunk,
        axes,
        chunk_shape,
        output,
        output_type,
        chunk_op,
        no_shuffle: bool,
    ):
        params = {"index": in_chunk.index}
        if output_type == OutputType.tensor:
            shape_c = list(in_chunk.shape)
            for ax in axes:
                if not no_shuffle and chunk_shape[ax] > 1:
                    shape_c[ax] = np.nan
            params["shape"] = tuple(shape_c)
            params["dtype"] = in_chunk.dtype
            params["order"] = output.order
        elif output_type == OutputType.dataframe:
            shape_c = list(in_chunk.shape)
            if 0 in axes:
                if not no_shuffle and chunk_shape[0] > 1:
                    shape_c[0] = np.nan
            params["shape"] = tuple(shape_c)
            if 1 not in axes:
                params["dtypes"] = in_chunk.dtypes
                params["columns_value"] = in_chunk.columns_value
            else:
                params["dtypes"] = output.dtypes
                params["columns_value"] = output.columns_value
            params["index_value"] = _shuffle_index_value(
                chunk_op, in_chunk.index_value, in_chunk.index
            )
        else:
            assert output_type == OutputType.series
            if no_shuffle:
                params["shape"] = in_chunk.shape
            else:
                params["shape"] = (np.nan,)
            params["name"] = in_chunk.name
            params["index_value"] = _shuffle_index_value(
                chunk_op, in_chunk.index_value, in_chunk.index
            )
            params["dtype"] = in_chunk.dtype
        return params

    @classmethod
    def tile(cls, op):
        inputs = op.inputs
        if has_unknown_shape(inputs):
            yield
        axis_to_nsplits = defaultdict(list)
        has_dataframe = any(
            output_type == OutputType.dataframe for output_type in op.output_types
        )
        for ax in op.axes:
            if has_dataframe and ax == 1:
                # if DataFrame exists, for the columns axis,
                # we only allow 1 chunk to ensure the columns consistent
                axis_to_nsplits[ax].append((inputs[0].shape[ax],))
                continue
            for inp in inputs:
                if ax < inp.ndim:
                    axis_to_nsplits[ax].append(inp.nsplits[ax])
        ax_nsplit = {ax: decide_unify_split(*ns) for ax, ns in axis_to_nsplits.items()}
        rechunked_inputs = []
        for inp in inputs:
            inp_ax_nsplit = {ax: ns for ax, ns in ax_nsplit.items() if ax < inp.ndim}
            inp = yield from cls._safe_rechunk(inp, inp_ax_nsplit)
            rechunked_inputs.append(inp)
        inputs = rechunked_inputs

        mapper_seeds = [None] * len(op.axes)
        reducer_seeds = [None] * len(op.axes)
        for i, ax in enumerate(op.axes):
            rs = np.random.RandomState(op.seeds[i])
            size = len(ax_nsplit[ax])
            if size > 1:
                mapper_seeds[i] = gen_random_seeds(size, rs)
                reducer_seeds[i] = gen_random_seeds(size, rs)
            else:
                mapper_seeds[i] = reducer_seeds[i] = [op.seeds[i]] * size
        out_chunks = []
        out_nsplits = []
        for output_type, inp, oup in zip(op.output_types, inputs, op.outputs):
            inp_axes = tuple(ax for ax in op.axes if ax < inp.ndim)
            reduce_sizes = tuple(inp.chunk_shape[ax] for ax in inp_axes)
            output_types = [output_type]

            if len(inp_axes) == 0:
                continue

            nsplits = list(inp.nsplits)
            for ax in inp_axes:
                cs = len(nsplits[ax])
                if cs > 1:
                    nsplits[ax] = (np.nan,) * cs
            out_nsplits.append(tuple(nsplits))

            if all(reduce_size == 1 for reduce_size in reduce_sizes):
                # no need to do shuffle
                chunks = []
                for c in inp.chunks:
                    chunk_op = LearnShuffle(
                        axes=inp_axes,
                        seeds=op.seeds[: len(inp_axes)],
                        output_types=output_types,
                    )
                    params = cls._calc_chunk_params(
                        c, inp_axes, inp.chunk_shape, oup, output_type, chunk_op, True
                    )
                    out_chunk = chunk_op.new_chunk([c], kws=[params])
                    chunks.append(out_chunk)
                out_chunks.append(chunks)
                continue

            if inp.ndim > 1:
                left_chunk_shape = [
                    s for ax, s in enumerate(inp.chunk_shape) if ax not in inp_axes
                ]
                idx_iter = itertools.product(*[range(s) for s in left_chunk_shape])
            else:
                idx_iter = [()]
            reduce_chunks = []
            out_chunks.append(reduce_chunks)
            for idx in idx_iter:
                map_chunks = []
                for reducer_inds in itertools.product(
                    *[range(s) for s in reduce_sizes]
                ):
                    inp_index = list(idx)
                    for ax, reducer_ind in zip(inp_axes, reducer_inds):
                        inp_index.insert(ax, reducer_ind)
                    inp_index = tuple(inp_index)
                    in_chunk = inp.cix[inp_index]
                    params = in_chunk.params
                    map_chunk_op = LearnShuffle(
                        stage=OperandStage.map,
                        output_types=output_types,
                        axes=inp_axes,
                        seeds=tuple(
                            mapper_seeds[j][in_chunk.index[ax]]
                            for j, ax in enumerate(inp_axes)
                        ),
                        reduce_sizes=reduce_sizes,
                    )
                    map_chunk = map_chunk_op.new_chunk([in_chunk], **params)
                    map_chunks.append(map_chunk)

                map_chunk_kw = {}
                if output_type == OutputType.tensor:
                    map_chunk_kw = {"dtype": inp.dtype, "shape": ()}
                proxy_chunk = LearnShuffleProxy(
                    _tileable_keys=[inp.key], output_types=[output_type]
                ).new_chunk(map_chunks, **map_chunk_kw)

                reduce_axes = tuple(
                    ax for j, ax in enumerate(inp_axes) if reduce_sizes[j] > 1
                )
                reduce_sizes_ = tuple(rs for rs in reduce_sizes if rs > 1)
                for c in map_chunks:
                    chunk_op = LearnShuffle(
                        stage=OperandStage.reduce,
                        output_types=output_types,
                        axes=reduce_axes,
                        seeds=tuple(
                            reducer_seeds[j][c.index[ax]]
                            for j, ax in enumerate(inp_axes)
                            if reduce_sizes[j] > 1
                        ),
                        reduce_sizes=reduce_sizes_,
                        n_reducers=len(map_chunks),
                    )
                    params = cls._calc_chunk_params(
                        c, inp_axes, inp.chunk_shape, oup, output_type, chunk_op, False
                    )
                    reduce_chunk = chunk_op.new_chunk([proxy_chunk], kws=[params])
                    reduce_chunks.append(reduce_chunk)

        new_op = op.copy()
        params = [out.params for out in op.outputs]
        if len(out_chunks) < len(op.outputs):
            # axes are all higher than its ndim
            for i, inp in enumerate(op.inputs):
                if all(ax >= inp.ndim for ax in op.axes):
                    out_chunks.insert(i, inp.chunks)
                    out_nsplits.insert(i, inp.nsplits)
            assert len(out_chunks) == len(op.outputs)
        for i, param, chunks, ns in zip(
            itertools.count(), params, out_chunks, out_nsplits
        ):
            param["chunks"] = chunks
            param["nsplits"] = ns
            param["_position_"] = i
        return new_op.new_tileables(op.inputs, kws=params)

    @classmethod
    def execute_single(cls, ctx, op):
        x = ctx[op.inputs[0].key]
        conv = lambda x: x
        if op.output_types[0] == OutputType.tensor:
            xp = get_array_module(x)
            if xp is sparse:
                conv = lambda x: x
            else:
                conv = (
                    xp.ascontiguousarray
                    if op.outputs[0].order.value == "C"
                    else xp.asfortranarray
                )

        for axis, seed in zip(op.axes, op.seeds):
            size = x.shape[axis]
            ind = np.random.RandomState(seed).permutation(np.arange(size))
            slc = (slice(None),) * axis + (ind,)
            x = _safe_slice(x, slc, op.output_types[0])

        ctx[op.outputs[0].key] = conv(x)

    @classmethod
    def execute_map(cls, ctx, op):
        out = op.outputs[0]
        x = ctx[op.input.key]
        axes, seeds, reduce_sizes = op.axes, op.seeds, op.reduce_sizes
        if 1 in set(op.reduce_sizes):
            # if chunk size on shuffle axis == 0
            inds = [slice(None) for _ in range(x.ndim)]
            extra_axes, extra_seeds, extra_reduce_sizes = [], [], []
            for ax, seed, reduce_size in zip(axes, seeds, reduce_sizes):
                rs = np.random.RandomState(seed)
                if reduce_size == 1:
                    inds[ax] = rs.permutation(np.arange(x.shape[ax]))
                else:
                    extra_axes.append(ax)
                    extra_seeds.append(seed)
                    extra_reduce_sizes.append(reduce_size)
            # for the reduce == 1
            # do shuffle on the map phase
            x = _safe_slice(x, tuple(inds), op.output_types[0])
            axes, seeds, reduce_sizes = extra_axes, extra_seeds, extra_reduce_sizes

        to_hash_inds = []
        for ax, seed, reduce_size in zip(axes, seeds, reduce_sizes):
            rs = np.random.RandomState(seed)
            to_hash_inds.append(rs.randint(reduce_size, size=x.shape[ax]))

        for reduce_index in itertools.product(*(range(rs) for rs in reduce_sizes)):
            index = list(out.index)
            for ax, ind in zip(axes, reduce_index):
                index[ax] = ind
            selected = x
            for ax, to_hash_ind in zip(axes, to_hash_inds):
                slc = (slice(None),) * ax + (to_hash_ind == index[ax],)
                selected = _safe_slice(selected, slc, op.output_types[0])
            ctx[out.key, tuple(index)] = (ctx.get_current_chunk().index, selected)

    @classmethod
    def execute_reduce(cls, ctx, op: "LearnShuffle"):
        inputs_grid = np.empty(op.reduce_sizes, dtype=object)
        for input_index, inp in op.iter_mapper_data(ctx):
            reduce_index = tuple(input_index[ax] for ax in op.axes)
            inputs_grid[reduce_index] = inp
        ret = cls._concat_grid(inputs_grid, op.axes, op.output_types[0])
        for ax, seed in zip(op.axes, op.seeds):
            ind = np.random.RandomState(seed).permutation(np.arange(ret.shape[ax]))
            slc = (slice(None),) * ax + (ind,)
            ret = _safe_slice(ret, slc, op.output_types[0])
        ctx[op.outputs[0].key] = ret

    @classmethod
    def _concat_grid(cls, grid, axes, output_type):
        if output_type == OutputType.tensor:
            return cls._concat_tensor_grid(grid, axes)
        elif output_type == OutputType.dataframe:
            return cls._concat_dataframe_grid(grid, axes)
        else:
            assert output_type == OutputType.series
            return cls._concat_series_grid(grid, axes)

    @classmethod
    def _concat_dataframe_grid(cls, grid, axes):
        xdf = pd if isinstance(grid.ravel()[0], pd.DataFrame) else cudf
        # if 1 exists in axes, the shuffle would have been done in map phase
        assert len(axes) == 1
        return xdf.concat(grid, axis=axes[0])

    @classmethod
    def _concat_series_grid(cls, grid, axes):
        assert axes == (0,) and grid.ndim == 1

        return reduce(lambda a, b: a.append(b), grid)

    @classmethod
    def _concat_tensor_grid(cls, grid, axes):
        cur = grid
        xp = get_array_module(grid.ravel()[0])
        for ax, i in zip(axes[:0:-1], range(len(axes) - 1, 0, -1)):
            new_shape = grid.shape[:i]
            new_grid = np.empty(new_shape, dtype=object)
            for idx in itertools.product(*(range(s) for s in new_shape)):
                new_grid[idx] = xp.concatenate(cur[idx], axis=ax)
            cur = new_grid
        return xp.concatenate(cur, axis=axes[0])

    @classmethod
    def execute(cls, ctx, op):
        if op.stage == OperandStage.map:
            cls.execute_map(ctx, op)
        elif op.stage == OperandStage.reduce:
            cls.execute_reduce(ctx, op)
        else:
            cls.execute_single(ctx, op)


[docs]def shuffle(*arrays, **options): arrays = [convert_to_tensor_or_dataframe(ar) for ar in arrays] axes = options.pop("axes", (0,)) if not isinstance(axes, Iterable): axes = (axes,) elif not isinstance(axes, tuple): axes = tuple(axes) random_state = check_random_state(options.pop("random_state", None)).to_numpy() if options: raise TypeError( f"shuffle() got an unexpected keyword argument {next(iter(options))}" ) max_ndim = max(ar.ndim for ar in arrays) axes = tuple(np.unique([validate_axis(max_ndim, ax) for ax in axes]).tolist()) seeds = gen_random_seeds(len(axes), random_state) # verify shape for ax in axes: shapes = {ar.shape[ax] for ar in arrays if ax < ar.ndim} if len(shapes) > 1: raise ValueError(f"arrays do not have same shape on axis {ax}") op = LearnShuffle(axes=axes, seeds=seeds, output_types=get_output_types(*arrays)) shuffled_arrays = op(arrays) if len(arrays) == 1: return shuffled_arrays[0] else: return ExecutableTuple(shuffled_arrays)