Source code for mars.dataframe.merge.merge

# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import itertools
import numpy as np
import pandas as pd

from ... import opcodes as OperandDef
from ...core import OutputType
from ...core.operand import OperandStage, MapReduceOperand
from ...serialization.serializables import (
    AnyField,
    BoolField,
    StringField,
    TupleField,
    KeyField,
    Int32Field,
)
from ..operands import DataFrameOperand, DataFrameOperandMixin, DataFrameShuffleProxy
from ..utils import (
    build_concatenated_rows_frame,
    build_df,
    parse_index,
    hash_dataframe_on,
    infer_index_value,
)

import logging

logger = logging.getLogger(__name__)


class DataFrameMergeAlign(MapReduceOperand, DataFrameOperandMixin):
    _op_type_ = OperandDef.DATAFRAME_SHUFFLE_MERGE_ALIGN

    _index_shuffle_size = Int32Field("index_shuffle_size")
    _shuffle_on = AnyField("shuffle_on")

    _input = KeyField("input")

    def __init__(self, index_shuffle_size=None, shuffle_on=None, **kw):
        super().__init__(
            _index_shuffle_size=index_shuffle_size,
            _shuffle_on=shuffle_on,
            _output_types=[OutputType.dataframe],
            **kw
        )

    @property
    def index_shuffle_size(self):
        return self._index_shuffle_size

    @property
    def shuffle_on(self):
        return self._shuffle_on

    def _set_inputs(self, inputs):
        super()._set_inputs(inputs)
        self._input = self._inputs[0]

    @classmethod
    def execute_map(cls, ctx, op):
        chunk = op.outputs[0]
        df = ctx[op.inputs[0].key]
        shuffle_on = op.shuffle_on

        if shuffle_on is not None:
            # shuffle on field may be resident in index
            to_reset_index_names = []
            if not isinstance(shuffle_on, (list, tuple)):
                if shuffle_on not in df.dtypes:
                    to_reset_index_names.append(shuffle_on)
            else:
                for son in shuffle_on:
                    if son not in df.dtypes:
                        to_reset_index_names.append(shuffle_on)
            if len(to_reset_index_names) > 0:
                df = df.reset_index(to_reset_index_names)

        filters = hash_dataframe_on(df, shuffle_on, op.index_shuffle_size)

        # shuffle on index
        for index_idx, index_filter in enumerate(filters):
            reducer_index = (index_idx, chunk.index[1])
            if index_filter is not None and index_filter is not list():
                ctx[chunk.key, reducer_index] = df.iloc[index_filter]
            else:
                ctx[chunk.key, reducer_index] = None

    @classmethod
    def execute_reduce(cls, ctx, op: "DataFrameMergeAlign"):
        chunk = op.outputs[0]
        input_idx_to_df = dict(op.iter_mapper_data_with_index(ctx))
        row_idxes = sorted({idx[0] for idx in input_idx_to_df})

        res = []
        for row_idx in row_idxes:
            row_df = input_idx_to_df.get((row_idx, 0), None)
            if row_df is not None:
                res.append(row_df)
        ctx[chunk.key] = pd.concat(res, axis=0)

    @classmethod
    def execute(cls, ctx, op):
        if op.stage == OperandStage.map:
            cls.execute_map(ctx, op)
        else:
            cls.execute_reduce(ctx, op)


class _DataFrameMergeBase(DataFrameOperand, DataFrameOperandMixin):
    _how = StringField("how")
    _on = AnyField("on")
    _left_on = AnyField("left_on")
    _right_on = AnyField("right_on")
    _left_index = BoolField("left_index")
    _right_index = BoolField("right_index")
    _sort = BoolField("sort")
    _suffixes = TupleField("suffixes")
    _copy = BoolField("copy")
    _indicator = BoolField("indicator")
    _validate = AnyField("validate")

    def __init__(
        self,
        how=None,
        on=None,
        left_on=None,
        right_on=None,
        left_index=False,
        right_index=False,
        sort=False,
        suffixes=("_x", "_y"),
        copy=True,
        indicator=False,
        validate=None,
        sparse=False,
        output_types=None,
        **kw
    ):
        super().__init__(
            _how=how,
            _on=on,
            _left_on=left_on,
            _right_on=right_on,
            _left_index=left_index,
            _right_index=right_index,
            _sort=sort,
            _suffixes=suffixes,
            _copy=copy,
            _indicator=indicator,
            _validate=validate,
            _output_types=output_types,
            sparse=sparse,
            **kw
        )

    @property
    def how(self):
        return self._how

    @property
    def on(self):
        return self._on

    @property
    def left_on(self):
        return self._left_on

    @property
    def right_on(self):
        return self._right_on

    @property
    def left_index(self):
        return self._left_index

    @property
    def right_index(self):
        return self._right_index

    @property
    def sort(self):
        return self._sort

    @property
    def suffixes(self):
        return self._suffixes

    @property
    def copy_(self):
        return self._copy

    @property
    def indicator(self):
        return self._indicator

    @property
    def validate(self):
        return self._validate

    def __call__(self, left, right):
        empty_left, empty_right = build_df(left), build_df(right)
        # this `merge` will check whether the combination of those arguments is valid
        merged = empty_left.merge(
            empty_right,
            how=self.how,
            on=self.on,
            left_on=self.left_on,
            right_on=self.right_on,
            left_index=self.left_index,
            right_index=self.right_index,
            sort=self.sort,
            suffixes=self.suffixes,
            copy=self.copy_,
            indicator=self.indicator,
            validate=self.validate,
        )

        # the `index_value` doesn't matter.
        index_tokenize_objects = [
            left,
            right,
            self.how,
            self.left_on,
            self.right_on,
            self.left_index,
            self.right_index,
        ]
        return self.new_dataframe(
            [left, right],
            shape=(np.nan, merged.shape[1]),
            dtypes=merged.dtypes,
            index_value=parse_index(merged.index, *index_tokenize_objects),
            columns_value=parse_index(merged.columns, store_data=True),
        )


class DataFrameShuffleMerge(_DataFrameMergeBase):
    _op_type_ = OperandDef.DATAFRAME_SHUFFLE_MERGE

    def __init__(self, **kw):
        super().__init__(**kw)

    @classmethod
    def _gen_shuffle_chunks(cls, op, out_shape, shuffle_on, df):
        # gen map chunks
        map_chunks = []
        for chunk in df.chunks:
            map_op = DataFrameMergeAlign(
                stage=OperandStage.map,
                shuffle_on=shuffle_on,
                sparse=chunk.issparse(),
                index_shuffle_size=out_shape[0],
            )
            map_chunks.append(
                map_op.new_chunk(
                    [chunk],
                    shape=(np.nan, np.nan),
                    dtypes=chunk.dtypes,
                    index=chunk.index,
                    index_value=chunk.index_value,
                    columns_value=chunk.columns_value,
                )
            )

        proxy_chunk = DataFrameShuffleProxy(
            output_types=[OutputType.dataframe]
        ).new_chunk(
            map_chunks,
            shape=(),
            dtypes=df.dtypes,
            index_value=df.index_value,
            columns_value=df.columns_value,
        )

        # gen reduce chunks
        reduce_chunks = []
        for out_idx in itertools.product(*(range(s) for s in out_shape)):
            reduce_op = DataFrameMergeAlign(
                stage=OperandStage.reduce, sparse=proxy_chunk.issparse()
            )
            reduce_chunks.append(
                reduce_op.new_chunk(
                    [proxy_chunk],
                    shape=(np.nan, np.nan),
                    dtypes=proxy_chunk.dtypes,
                    index=out_idx,
                    index_value=proxy_chunk.index_value,
                    columns_value=proxy_chunk.columns_value,
                )
            )
        return reduce_chunks

    @classmethod
    def _tile_one_chunk(cls, op, left, right):
        df = op.outputs[0]
        if len(left.chunks) == 1 and len(right.chunks) == 1:
            merge_op = op.copy().reset_key()
            out_chunk = merge_op.new_chunk(
                [left.chunks[0], right.chunks[0]],
                shape=df.shape,
                index=left.chunks[0].index,
                index_value=df.index_value,
                dtypes=df.dtypes,
                columns_value=df.columns_value,
            )
            out_chunks = [out_chunk]
            nsplits = ((np.nan,), (df.shape[1],))
        elif len(left.chunks) == 1:
            out_chunks = []
            left_chunk = left.chunks[0]
            for c in right.chunks:
                merge_op = op.copy().reset_key()
                out_chunk = merge_op.new_chunk(
                    [left_chunk, c],
                    shape=(np.nan, df.shape[1]),
                    index=c.index,
                    index_value=infer_index_value(
                        left_chunk.index_value, c.index_value
                    ),
                    dtypes=df.dtypes,
                    columns_value=df.columns_value,
                )
                out_chunks.append(out_chunk)
            nsplits = ((np.nan,) * len(right.chunks), (df.shape[1],))
        else:
            out_chunks = []
            right_chunk = right.chunks[0]
            for c in left.chunks:
                merge_op = op.copy().reset_key()
                out_chunk = merge_op.new_chunk(
                    [c, right_chunk],
                    shape=(np.nan, df.shape[1]),
                    index=c.index,
                    index_value=infer_index_value(
                        right_chunk.index_value, c.index_value
                    ),
                    dtypes=df.dtypes,
                    columns_value=df.columns_value,
                )
                out_chunks.append(out_chunk)
            nsplits = ((np.nan,) * len(left.chunks), (df.shape[1],))

        new_op = op.copy()
        return new_op.new_dataframes(
            op.inputs,
            df.shape,
            nsplits=nsplits,
            chunks=out_chunks,
            dtypes=df.dtypes,
            index_value=df.index_value,
            columns_value=df.columns_value,
        )

    @classmethod
    def tile(cls, op):
        df = op.outputs[0]
        left = build_concatenated_rows_frame(op.inputs[0])
        right = build_concatenated_rows_frame(op.inputs[1])

        if len(left.chunks) == 1 or len(right.chunks) == 1:
            return cls._tile_one_chunk(op, left, right)

        left_row_chunk_size = left.chunk_shape[0]
        right_row_chunk_size = right.chunk_shape[0]
        out_row_chunk_size = max(left_row_chunk_size, right_row_chunk_size)

        out_chunk_shape = (out_row_chunk_size, 1)
        nsplits = [[np.nan for _ in range(out_row_chunk_size)], [df.shape[1]]]

        left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
        right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)

        # do shuffle
        left_chunks = cls._gen_shuffle_chunks(op, out_chunk_shape, left_on, left)
        right_chunks = cls._gen_shuffle_chunks(op, out_chunk_shape, right_on, right)

        out_chunks = []
        for left_chunk, right_chunk in zip(left_chunks, right_chunks):
            merge_op = op.copy().reset_key()
            out_chunk = merge_op.new_chunk(
                [left_chunk, right_chunk],
                shape=(np.nan, df.shape[1]),
                index=left_chunk.index,
                index_value=infer_index_value(
                    left_chunk.index_value, right_chunk.index_value
                ),
                dtypes=df.dtypes,
                columns_value=df.columns_value,
            )
            out_chunks.append(out_chunk)

        new_op = op.copy()
        return new_op.new_dataframes(
            op.inputs,
            df.shape,
            nsplits=tuple(tuple(ns) for ns in nsplits),
            chunks=out_chunks,
            dtypes=df.dtypes,
            index_value=df.index_value,
            columns_value=df.columns_value,
        )

    @classmethod
    def execute(cls, ctx, op):
        chunk = op.outputs[0]
        left, right = ctx[op.inputs[0].key], ctx[op.inputs[1].key]

        def execute_merge(x, y):
            if not op.gpu:
                kwargs = dict(
                    copy=op.copy, validate=op.validate, indicator=op.indicator
                )
            else:  # pragma: no cover
                # cudf doesn't support 'validate' and 'copy'
                kwargs = dict(indicator=op.indicator)
            return x.merge(
                y,
                how=op.how,
                on=op.on,
                left_on=op.left_on,
                right_on=op.right_on,
                left_index=op.left_index,
                right_index=op.right_index,
                sort=op.sort,
                suffixes=op.suffixes,
                **kwargs
            )

        # workaround for: https://github.com/pandas-dev/pandas/issues/27943
        try:
            r = execute_merge(left, right)
        except ValueError:
            r = execute_merge(left.copy(deep=True), right.copy(deep=True))

        # make sure column's order
        if not all(
            n1 == n2 for n1, n2 in zip(chunk.columns_value.to_pandas(), r.columns)
        ):
            r = r[list(chunk.columns_value.to_pandas())]
        ctx[chunk.key] = r


def _prepare_shuffle_on(use_index, side_on, on):
    # consistent with pandas: `left_index` precedes `left_on` and `right_index` precedes `right_on`
    if use_index:
        # `None` means we will shuffle on df.index.
        return None
    elif side_on is not None:
        return side_on
    else:
        return on


[docs]def merge( df, right, how="inner", on=None, left_on=None, right_on=None, left_index=False, right_index=False, sort=False, suffixes=("_x", "_y"), copy=True, indicator=False, strategy=None, validate=None, ): if strategy is not None and strategy != "shuffle": raise NotImplementedError("Only shuffle merge is supported") op = DataFrameShuffleMerge( how=how, on=on, left_on=left_on, right_on=right_on, left_index=left_index, right_index=right_index, sort=sort, suffixes=suffixes, copy=copy, indicator=indicator, validate=validate, output_types=[OutputType.dataframe], ) return op(df, right)
def join( df, other, on=None, how="left", lsuffix="", rsuffix="", sort=False, strategy=None ): return merge( df, other, left_on=on, how=how, left_index=on is None, right_index=True, suffixes=(lsuffix, rsuffix), sort=sort, strategy=strategy, )