Source code for mars.dataframe.reduction.core

# Copyright 1999-2020 Alibaba Group Holding Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
from collections import OrderedDict
from typing import NamedTuple, Any, List, Dict, Union, Callable

import numpy as np
import pandas as pd

from ...core import OutputType, Entity, Base
from ...operands import OperandStage
from ...utils import tokenize, is_build_mode, enter_mode, recursive_tile
from ...serialize import BoolField, AnyField, DataTypeField, Int32Field
from ..core import SERIES_TYPE
from ..utils import parse_index, build_df, build_empty_df, build_series, \
    build_empty_series, validate_axis
from ..operands import DataFrameOperandMixin, DataFrameOperand, DATAFRAME_TYPE

class DataFrameReductionOperand(DataFrameOperand):
    _axis = AnyField('axis')
    _skipna = BoolField('skipna')
    _level = AnyField('level')
    _numeric_only = BoolField('numeric_only')
    _bool_only = BoolField('bool_only')
    _min_count = Int32Field('min_count')
    _use_inf_as_na = BoolField('use_inf_as_na')

    _dtype = DataTypeField('dtype')
    _combine_size = Int32Field('combine_size')

    def __init__(self, axis=None, skipna=None, level=None, numeric_only=None, bool_only=None,
                 min_count=None, stage=None, dtype=None, combine_size=None, gpu=None,
                 sparse=None, output_types=None, use_inf_as_na=None, **kw):
        super().__init__(_axis=axis, _skipna=skipna, _level=level, _numeric_only=numeric_only,
                         _bool_only=bool_only, _min_count=min_count, _stage=stage, _dtype=dtype,
                         _combine_size=combine_size, _gpu=gpu, _sparse=sparse,
                         _output_types=output_types, _use_inf_as_na=use_inf_as_na, **kw)

    def axis(self):
        return self._axis

    def skipna(self):
        return self._skipna

    def level(self):
        return self._level

    def numeric_only(self):
        return self._numeric_only

    def bool_only(self):
        return self._bool_only

    def min_count(self):
        return self._min_count

    def dtype(self):
        return self._dtype

    def combine_size(self):
        return self._combine_size

    def use_inf_as_na(self):
        return self._use_inf_as_na

    def get_reduction_args(self, axis=None):
        args = dict(skipna=self.skipna)
        if self.inputs[0].ndim > 1:
            args['axis'] = axis
        if self.numeric_only is not None:
            args['numeric_only'] = self.numeric_only
        if self.bool_only is not None:
            args['bool_only'] = self.bool_only
        return {k: v for k, v in args.items() if v is not None}

class DataFrameCumReductionOperand(DataFrameOperand):
    _axis = AnyField('axis')
    _skipna = BoolField('skipna')
    _use_inf_as_na = BoolField('use_inf_as_na')

    _dtype = DataTypeField('dtype')

    def __init__(self, axis=None, skipna=None, dtype=None, gpu=None, sparse=None,
                 output_types=None, use_inf_as_na=None, stage=None, **kw):
        super().__init__(_axis=axis, _skipna=skipna, _dtype=dtype, _gpu=gpu, _sparse=sparse,
                         _output_types=output_types, _stage=stage, _use_inf_as_na=use_inf_as_na, **kw)

    def axis(self):
        return self._axis

    def skipna(self):
        return self._skipna

    def dtype(self):
        return self._dtype

    def use_inf_as_na(self):
        return self._use_inf_as_na

def _default_agg_fun(value, func_name=None, **kw):
    if value.ndim == 1:
        kw.pop('bool_only', None)
        kw.pop('numeric_only', None)
        return getattr(value, func_name)(**kw)
        return getattr(value, func_name)(**kw)

class DataFrameReductionMixin(DataFrameOperandMixin):
    def _make_agg_object(cls, op):
        func_name = getattr(op, '_func_name')
        kw = dict(skipna=op.skipna, numeric_only=op.numeric_only,
        kw = {k: v for k, v in kw.items() if v is not None}
        fun = functools.partial(_default_agg_fun, func_name=func_name, **kw)
        fun.__name__ = func_name
        return fun

    def tile(cls, op):
        in_df = op.inputs[0]
        out_df = op.outputs[0]

        if isinstance(out_df, SERIES_TYPE):
            output_type = OutputType.series
            dtypes = pd.Series([out_df.dtype], index=[])
            index = out_df.index_value.to_pandas()
        elif out_df.ndim == 1:
            output_type = OutputType.tensor
            dtypes, index = out_df.dtype, None
            output_type = OutputType.scalar
            dtypes, index = out_df.dtype, None

        out_df = recursive_tile(in_df.agg(
            cls._make_agg_object(op), axis=op.axis or 0, _numeric_only=op.numeric_only,
            _bool_only=op.bool_only, _combine_size=op.combine_size, _output_type=output_type,
            _dtypes=dtypes, _index=index
        return [out_df]

    def _call_groupby_level(self, df, level):
        return df.groupby(level=level).agg(self._make_agg_object(self))

    def _call_dataframe(self, df):
        axis = getattr(self, 'axis', None) or 0
        level = getattr(self, 'level', None)
        skipna = getattr(self, 'skipna', None)
        numeric_only = getattr(self, 'numeric_only', None)
        bool_only = getattr(self, 'bool_only', None)
        self._axis = axis = validate_axis(axis, df)
        func_name = getattr(self, '_func_name')

        if level is not None and axis == 1:
            raise NotImplementedError('Not support specify level for axis==1')

        empty_df = build_df(df)
        if func_name == 'count':
            reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, numeric_only=numeric_only)
        elif func_name == 'nunique':
            reduced_df = getattr(empty_df, func_name)(axis=axis)
        elif func_name in ('all', 'any'):
            reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, bool_only=bool_only)
        elif func_name == 'size':
            reduced_df = pd.Series(np.zeros(df.shape[1 - axis]), index=empty_df.columns if axis == 0 else None)
        elif func_name == 'custom_reduction':
            reduced_df = getattr(self, 'custom_reduction').__call_agg__(empty_df)
        elif func_name == 'str_concat':
            reduced_df = empty_df.apply(lambda s:**getattr(self, 'get_reduction_args')()), axis=axis)
            reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, skipna=skipna,

        if level is not None:
            return self._call_groupby_level(df[list(reduced_df.columns)], level)

        reduced_shape = (df.shape[0],) if axis == 1 else reduced_df.shape
        return self.new_series([df], shape=reduced_shape, dtype=reduced_df.dtype,
                               index_value=parse_index(reduced_df.index, store_data=axis == 0))

    def _call_series(self, series):
        level = getattr(self, 'level', None)
        axis = getattr(self, 'axis', None)
        skipna = getattr(self, 'skipna', None)
        numeric_only = getattr(self, 'numeric_only', None)
        bool_only = getattr(self, 'bool_only', None)
        self._axis = axis = validate_axis(axis or 0, series)
        func_name = getattr(self, '_func_name')

        if level is not None:
            return self._call_groupby_level(series, level)

        empty_series = build_series(series)
        if func_name == 'count':
            reduced_series = empty_series.count(level=level)
        elif func_name == 'nunique':
            reduced_series = empty_series.nunique()
        elif func_name in ('all', 'any'):
            reduced_series = getattr(empty_series, func_name)(axis=axis, level=level, bool_only=bool_only)
        elif func_name == 'size':
            reduced_series = empty_series.size
        elif func_name == 'custom_reduction':
            reduced_series = getattr(self, 'custom_reduction').__call_agg__(empty_series)
        elif func_name == 'str_concat':
            reduced_series = pd.Series([**getattr(self, 'get_reduction_args')())])
            reduced_series = getattr(empty_series, func_name)(axis=axis, level=level, skipna=skipna,

        return self.new_scalar([series], dtype=np.array(reduced_series).dtype)

    def __call__(self, a):
        if isinstance(a, DATAFRAME_TYPE):
            return self._call_dataframe(a)
            return self._call_series(a)

class DataFrameCumReductionMixin(DataFrameOperandMixin):
    def _tile_one_chunk(cls, op):
        df = op.outputs[0]
        params = df.params.copy()

        chk = op.inputs[0].chunks[0]
        chunk_params = {k: v for k, v in chk.params.items()
                        if k in df.params}
        chunk_params['shape'] = df.shape
        chunk_params['index'] = chk.index
        new_chunk_op = op.copy().reset_key()
        chunk = new_chunk_op.new_chunk(op.inputs[0].chunks, kws=[chunk_params])

        new_op = op.copy()
        nsplits = tuple((s,) for s in chunk.shape)
        params['chunks'] = [chunk]
        params['nsplits'] = nsplits
        return new_op.new_tileables(op.inputs, kws=[params])

    def _build_combine(cls, op, input_chunks, summary_chunks, idx):
        c = input_chunks[idx]
        to_concat_chunks = [c]
        for j in range(idx):

        new_chunk_op = op.copy().reset_key()
        new_chunk_op._stage = OperandStage.combine
        return new_chunk_op.new_chunk(to_concat_chunks, **c.params)

    def _tile_dataframe(cls, op):
        in_df = op.inputs[0]
        df = op.outputs[0]

        n_rows, n_cols = in_df.chunk_shape

        # map to get individual results and summaries
        src_chunks = np.empty(in_df.chunk_shape, dtype=np.object)
        summary_chunks = np.empty(in_df.chunk_shape, dtype=np.object)
        for c in in_df.chunks:
            new_chunk_op = op.copy().reset_key()
            new_chunk_op._stage =
            if op.axis == 1:
                summary_shape = (c.shape[0], 1)
                summary_shape = (1, c.shape[1])
            src_chunks[c.index] = c
            summary_chunks[c.index] = new_chunk_op.new_chunk([c], shape=summary_shape, dtypes=df.dtypes)

        # combine summaries into results
        output_chunk_array = np.empty(in_df.chunk_shape, dtype=np.object)
        if op.axis == 1:
            for row in range(n_rows):
                row_src = src_chunks[row, :]
                row_summaries = summary_chunks[row, :]
                for col in range(n_cols):
                    output_chunk_array[row, col] = cls._build_combine(op, row_src, row_summaries, col)
            for col in range(n_cols):
                col_src = src_chunks[:, col]
                col_summaries = summary_chunks[:, col]
                for row in range(n_rows):
                    output_chunk_array[row, col] = cls._build_combine(op, col_src, col_summaries, row)

        output_chunks = list(output_chunk_array.reshape((n_rows * n_cols,)))
        new_op = op.copy().reset_key()
        return new_op.new_tileables(op.inputs, shape=in_df.shape, nsplits=in_df.nsplits,
                                    chunks=output_chunks, dtypes=df.dtypes,
                                    index_value=df.index_value, columns_value=df.columns_value)

    def _tile_series(cls, op):
        in_series = op.inputs[0]
        series = op.outputs[0]

        # map to get individual results and summaries
        summary_chunks = np.empty(in_series.chunk_shape, dtype=np.object)
        for c in in_series.chunks:
            new_chunk_op = op.copy().reset_key()
            new_chunk_op._stage =
            summary_chunks[c.index] = new_chunk_op.new_chunk([c], shape=(1,), dtype=series.dtype)

        # combine summaries into results
        output_chunks = [
            cls._build_combine(op, in_series.chunks, summary_chunks, i) for i in range(len(in_series.chunks))
        new_op = op.copy().reset_key()
        return new_op.new_tileables(op.inputs, shape=in_series.shape, nsplits=in_series.nsplits,
                                    chunks=output_chunks, dtype=series.dtype,

    def tile(cls, op):
        in_df = op.inputs[0]
        if len(in_df.chunks) == 1:
            return cls._tile_one_chunk(op)
        if isinstance(in_df, DATAFRAME_TYPE):
            return cls._tile_dataframe(op)
            return cls._tile_series(op)

    def _get_last_slice(op, df, start):
        if op.output_types[0] == OutputType.series:
            return df.iloc[start:]
            if op.axis == 1:
                return df.iloc[:, start:]
                return df.iloc[start:, :]

    def _execute_map(cls, ctx, op):
        in_data = ctx[op.inputs[0].key]
        kwargs = dict()
        if op.axis is not None:
            kwargs['axis'] = op.axis
        if op.skipna is not None:
            kwargs['skipna'] = op.skipna
        partial = getattr(in_data, getattr(cls, '_func_name'))(**kwargs)
        if op.skipna:
            partial.fillna(method='ffill', axis=op.axis, inplace=True)
        ctx[op.outputs[0].key] = cls._get_last_slice(op, partial, -1)

    def _execute_combine(cls, ctx, op):
        kwargs = dict()
        if op.axis is not None:
            kwargs['axis'] = op.axis
        if op.skipna is not None:
            kwargs['skipna'] = op.skipna

        if len(op.inputs) > 1:
            ref_datas = [ctx[inp.key] for inp in op.inputs[1:]]
            concat_df = getattr(pd.concat(ref_datas, axis=op.axis), getattr(cls, '_func_name'))(**kwargs)
            if op.skipna:
                concat_df.fillna(method='ffill', axis=op.axis, inplace=True)

            in_data = ctx[op.inputs[0].key]
            concat_df = pd.concat([cls._get_last_slice(op, concat_df, -1), in_data], axis=op.axis)
            result = getattr(concat_df, getattr(cls, '_func_name'))(**kwargs)
            ctx[op.outputs[0].key] = cls._get_last_slice(op, result, 1)
            ctx[op.outputs[0].key] = getattr(ctx[op.inputs[0].key], getattr(cls, '_func_name'))(**kwargs)

    def execute(cls, ctx, op):
            pd.set_option('mode.use_inf_as_na', op.use_inf_as_na)
            if op.stage ==
                return cls._execute_map(ctx, op)
                return cls._execute_combine(ctx, op)

    def _call_dataframe(self, df):
        axis = getattr(self, 'axis', None) or 0
        self._axis = axis = validate_axis(axis, df)

        empty_df = build_empty_df(df.dtypes)
        reduced_df = getattr(empty_df, getattr(self, '_func_name'))(axis=axis)
        return self.new_dataframe([df], shape=df.shape, dtypes=reduced_df.dtypes,
                                  index_value=df.index_value, columns_value=df.columns_value)

    def _call_series(self, series):
        axis = getattr(self, 'axis', None) or 0
        if axis == 'index':
            axis = 0
        self._axis = axis

        return self.new_series([series], shape=series.shape, dtype=series.dtype,
                     , index_value=series.index_value)

    def __call__(self, a):
        if isinstance(a, DATAFRAME_TYPE):
            return self._call_dataframe(a)
            return self._call_series(a)

[docs]class CustomReduction: name: Union[str, None] output_limit: Union[int, None] kwds: Dict # set to True when pre() already performs aggregation pre_with_agg = False
[docs] def __init__(self, name=None, is_gpu=False): = name or '<custom>' self.output_limit = 1 self._is_gpu = is_gpu
@property def __name__(self): return def __call__(self, value): if is_build_mode(): from .custom_reduction import build_custom_reduction_result return build_custom_reduction_result(value, self) return self.__call_agg__(value) def __call_agg__(self, value): r = self.pre(value) if not isinstance(r, tuple): r = (r,) # update output limit into actual size self.output_limit = len(r) # only perform aggregation when pre() does not perform aggregation if not self.pre_with_agg: r = self.agg(*r) if not isinstance(r, tuple): r = (r,) r =*r) return r def is_gpu(self): return self._is_gpu if not is_build_mode() else False def pre(self, value): # noqa: R0201 # pylint: disable=no-self-use return value, def agg(self, *values): # noqa: R0201 # pylint: disable=no-self-use raise NotImplementedError def post(self, *value): # noqa: R0201 # pylint: disable=no-self-use assert len(value) == 1 return value[0] def __mars_tokenize__(self): import cloudpickle return cloudpickle.dumps(self)
class ReductionPreStep(NamedTuple): input_key: str output_key: str columns: Union[List[str], None] func: Callable class ReductionAggStep(NamedTuple): input_key: str map_func_name: Union[str, None] agg_func_name: Union[str, None] custom_reduction: Union[CustomReduction, None] output_key: str output_limit: int kwds: Dict[str, Any] class ReductionPostStep(NamedTuple): input_keys: List[str] output_key: str func_name: str columns: Union[List[str], None] func: Callable class ReductionSteps(NamedTuple): pre_funcs: List[ReductionPreStep] agg_funcs: List[ReductionAggStep] post_funcs: List[ReductionPostStep] # lookup table for numpy arithmetics in pandas _func_name_converts = dict( greater='gt', greater_equal='ge', less='lt', less_equal='le', equal='eq', not_equal='ne', true_divide='truediv', floor_divide='floordiv', power='pow', ) _func_name_to_op = dict( greater='>', gt='>', greater_equal='>=', ge='>', less='<', lt='<', less_equal='<=', le='<=', equal='==', eq='==', not_equal='!=', ne='!=', bitwise_and='&', __and__='&', bitwise_or='|', __or__='|', bitwise_xor='^', __xor__='^', add='+', subtract='-', sub='-', multiply='*', mul='*', true_divide='/', truediv='/', floor_divide='//', floordiv='//', power='**', pow='**', mod='%', ) _func_compile_cache = dict() # type: Dict[str, ReductionSteps] class ReductionCompiler: def __init__(self, axis=0, store_source=False): self._axis = axis self._store_source = store_source self._key_to_tileable = dict() self._output_tileables = [] self._lambda_counter = 0 self._custom_counter = 0 self._func_cache = dict() self._compiled_funcs = [] self._output_key_to_pre_steps = dict() self._output_key_to_pre_cols = dict() self._output_key_to_agg_steps = dict() self._output_key_to_post_steps = dict() self._output_key_to_post_cols = dict() @classmethod def _check_function_valid(cls, func): if isinstance(func, functools.partial): return cls._check_function_valid(func.func) elif isinstance(func, CustomReduction): return func_code = func.__code__ func_vars = {n: func.__globals__.get(n) for n in func_code.co_names} if func.__closure__: func_vars.update({n: cell.cell_contents for n, cell in zip(func_code.co_freevars, func.__closure__)}) # external Mars objects shall not be referenced for var_name, val in func_vars.items(): if isinstance(val, (Base, Entity)): raise ValueError(f'Variable {var_name} used by {func.__name__} ' 'cannot be a Mars object') def add_function(self, func, ndim, cols=None, func_name=None): cols = cols if cols is not None and self._axis == 0 else None func_name = func_name or getattr(func, '__name__', None) if func_name == '<lambda>' or func_name is None: func_name = f'<lambda_{self._lambda_counter}>' self._lambda_counter += 1 if func_name == '<custom>' or func_name is None: func_name = f'<custom_{self._custom_counter}>' self._custom_counter += 1 compile_result = self._compile_function(func, func_name, ndim=ndim) self._compiled_funcs.append(compile_result) for step in compile_result.pre_funcs: self._output_key_to_pre_steps[step.output_key] = step if step.output_key in self._output_key_to_pre_cols: existing_cols = self._output_key_to_pre_cols[step.output_key] if existing_cols is not None: existing_col_set = set(existing_cols) self._output_key_to_pre_cols[step.output_key].extend( [c for c in cols if c not in existing_col_set]) else: self._output_key_to_pre_cols[step.output_key] = list(cols) if cols is not None else None for step in compile_result.agg_funcs: self._output_key_to_agg_steps[step.output_key] = step for step in compile_result.post_funcs: self._output_key_to_post_steps[step.output_key] = step self._output_key_to_post_cols[step.output_key] = cols @functools.lru_cache(100) def _compile_expr_function(self, py_src): from ... import tensor, dataframe result_store = dict() global_vars = globals() global_vars.update(dict(mt=tensor, md=dataframe, array=np.array, nan=np.nan)) exec(py_src, global_vars, result_store) # noqa: W0122 # nosec # pylint: disable=exec-used fun = result_store['expr_function'] if self._store_source: fun.__source__ = py_src return fun @staticmethod def _build_mock_return_object(func, input_dtype, ndim): from ..initializer import DataFrame as MarsDataFrame, Series as MarsSeries if ndim == 1: mock_series = build_empty_series(np.dtype(input_dtype)) mock_obj = MarsSeries(mock_series) else: mock_df = build_empty_df(pd.Series([np.dtype(input_dtype)] * 2, index=['A', 'B'])) mock_obj = MarsDataFrame(mock_df) # calc target tileable to generate DAG return func(mock_obj) @enter_mode(build=True) def _compile_function(self, func, func_name=None, ndim=1) -> ReductionSteps: from . import DataFrameAll, DataFrameAny, DataFrameSum, DataFrameProd, \ DataFrameCount, DataFrameMin, DataFrameMax, DataFrameSize, \ DataFrameStrConcat, DataFrameCustomReduction from ...tensor.arithmetic.core import TensorBinOp, TensorUnaryOp from ...tensor.base import TensorWhere from ..arithmetic.core import DataFrameBinOp, DataFrameUnaryOp from ..datasource.dataframe import DataFrameDataSource from ..indexing.where import DataFrameWhere from ..datasource.series import SeriesDataSource func_token = tokenize(func, self._axis, func_name, ndim) if func_token in _func_compile_cache: return _func_compile_cache[func_token] custom_reduction = func if isinstance(func, CustomReduction) else None atomic_agg_op_types = (DataFrameAll, DataFrameAny, DataFrameSum, DataFrameProd, DataFrameCount, DataFrameMin, DataFrameMax, DataFrameSize, DataFrameStrConcat, DataFrameCustomReduction) self._check_function_valid(func) try: func_ret = self._build_mock_return_object(func, float, ndim=ndim) except (TypeError, AttributeError): # we may encounter lambda x:, use an object series to test func_ret = self._build_mock_return_object(func, object, ndim=1) output_limit = getattr(func, 'output_limit', None) or 1 if not isinstance(func_ret, (Base, Entity)): raise ValueError(f'Custom function should return a Mars object, not {type(func_ret)}') if func_ret.ndim >= ndim: raise ValueError('Function not a reduction') agg_graph = func_ret.build_graph() agg_tileables = set(t for t in agg_graph if isinstance(t.op, atomic_agg_op_types)) # check operands before aggregation for t in agg_graph.dfs(list(agg_tileables), visit_predicate='all', reverse=True): if t not in agg_tileables and \ not isinstance(t.op, (DataFrameUnaryOp, DataFrameBinOp, TensorUnaryOp, TensorBinOp, TensorWhere, DataFrameWhere, DataFrameDataSource, SeriesDataSource)): raise ValueError(f'Cannot support operand {type(t.op)} in aggregation') # check operands after aggregation for t in agg_graph.dfs(list(agg_tileables), visit_predicate='all'): if t not in agg_tileables and \ not isinstance(t.op, (DataFrameUnaryOp, DataFrameBinOp, TensorWhere, DataFrameWhere, TensorUnaryOp, TensorBinOp)): raise ValueError(f'Cannot support operand {type(t.op)} in aggregation') pre_funcs, agg_funcs, post_funcs = [], [], [] visited_inputs = set() # collect aggregations and their inputs for t in agg_tileables: agg_input_key = t.inputs[0].key # collect agg names step_func_name = getattr(t.op, '_func_name') if step_func_name in ('count', 'size'): map_func_name, agg_func_name = step_func_name, 'sum' else: map_func_name, agg_func_name = step_func_name, step_func_name # build agg description agg_funcs.append(ReductionAggStep( agg_input_key, map_func_name, agg_func_name, custom_reduction, t.key, output_limit, t.op.get_reduction_args(axis=self._axis) )) # collect agg input and build function if agg_input_key not in visited_inputs: visited_inputs.add(agg_input_key) initial_inputs = list(t.inputs[0].build_graph().iter_indep()) assert len(initial_inputs) == 1 input_key = initial_inputs[0].key func_str, _ = self._generate_function_str(t.inputs[0]) pre_funcs.append(ReductionPreStep( input_key, agg_input_key, None, self._compile_expr_function(func_str) )) # collect function output after agg func_str, input_keys = self._generate_function_str(func_ret) post_funcs.append(ReductionPostStep( input_keys, func_ret.key, func_name, None, self._compile_expr_function(func_str) )) if len(_func_compile_cache) > 100: # pragma: no cover _func_compile_cache.pop(next(iter(_func_compile_cache.keys()))) result = _func_compile_cache[func_token] = ReductionSteps(pre_funcs, agg_funcs, post_funcs) return result def _generate_function_str(self, out_tileable): """ Generate python code from tileable DAG """ from ...tensor.arithmetic.core import TensorBinOp, TensorUnaryOp from ...tensor.base import TensorWhere from ...tensor.datasource import Scalar from ..arithmetic.core import DataFrameBinOp, DataFrameUnaryOp, DataFrameUnaryUfunc from ..datasource.dataframe import DataFrameDataSource from ..datasource.series import SeriesDataSource from ..indexing.where import DataFrameWhere input_key_to_var = OrderedDict() local_key_to_var = dict() ref_counts = dict() ref_visited = set() local_lines = [] input_op_types = (DataFrameDataSource, SeriesDataSource, DataFrameReductionOperand) def _calc_ref_counts(t): # calculate object refcount for t, this reduces memory usage in functions if t.key in ref_visited: return ref_visited.add(t.key) for inp in t.inputs: _calc_ref_counts(inp) if not isinstance(inp.op, input_op_types): if inp.key not in ref_counts: ref_counts[inp.key] = 0 ref_counts[inp.key] += 1 def _gen_expr_str(t): # generate code for t if t.key in local_key_to_var: return if isinstance(t.op, input_op_types): # tileable is an input arg, build a function variable if t.key not in input_key_to_var: # pragma: no branch input_key_to_var[t.key] = local_key_to_var[t.key] = f'invar{len(input_key_to_var)}' else: keys_to_del = [] for inp in t.inputs: _gen_expr_str(inp) if inp.key in ref_counts: ref_counts[inp.key] -= 1 if ref_counts[inp.key] == 0: # the input is no longer referenced, a del statement will be produced keys_to_del.append(inp.key) var_name = local_key_to_var[t.key] = f'var{len(local_key_to_var)}' keys_to_vars = {inp.key: local_key_to_var[inp.key] for inp in t.inputs} def _interpret_var(v): # get representation for variables if hasattr(v, 'key'): return keys_to_vars[v.key] return v func_name = func_name_raw = getattr(t.op, '_func_name', None) rfunc_name = getattr(t.op, '_rfunc_name', func_name) # handle function name differences between numpy and pandas arithmetic ops if func_name in _func_name_converts: func_name = _func_name_converts[func_name] if rfunc_name in _func_name_converts: rfunc_name = 'r' + _func_name_converts[rfunc_name] # build given different op types if isinstance(t.op, (DataFrameUnaryOp, TensorUnaryOp)): val = _interpret_var(t.inputs[0]) if isinstance(t.op, DataFrameUnaryUfunc): statements = [f'{var_name} = np.{func_name_raw}({val})'] else: statements = [f'try:', f' {var_name} = {val}.{func_name}()', f'except AttributeError:', f' {var_name} = np.{func_name_raw}({val})'] elif isinstance(t.op, (DataFrameBinOp, TensorBinOp)): lhs, rhs = t.op.lhs, t.op.rhs op_axis = 1 - self._axis if hasattr(lhs, 'ndim') and hasattr(rhs, 'ndim') \ and lhs.ndim != rhs.ndim else None lhs = _interpret_var(lhs) rhs = _interpret_var(rhs) axis_expr = f'axis={op_axis!r}, ' if op_axis is not None else '' op_str = _func_name_to_op[func_name] if t.op.lhs is t.inputs[0]: statements = [f'try:', f' {var_name} = {lhs}.{func_name}({rhs}, {axis_expr})', f'except AttributeError:', f' {var_name} = {lhs} {op_str} {rhs}'] else: statements = [f'try:', f' {var_name} = {rhs}.{rfunc_name}({lhs}, {axis_expr})', f'except AttributeError:', f' {var_name} = {rhs} {op_str} {lhs}'] elif isinstance(t.op, TensorWhere): cond = _interpret_var(t.op.condition) x = _interpret_var(t.op.x) y = _interpret_var(t.op.y) statements = [f'if not gpu:', f' {var_name} = np.where({cond}, {x}, {y})', f'else:', # there is a bug with cudf.where f' {var_name} = {x}'] elif isinstance(t.op, DataFrameWhere): func_name = 'mask' if t.op.replace_true else 'where' inp = _interpret_var(t.op.input) cond = _interpret_var(t.op.cond) other = _interpret_var(t.op.other) statements = [f'if not gpu:', f' {var_name} = {inp}.{func_name}({cond}, {other}, ' f'axis={t.op.axis!r}, level={t.op.level!r})', f'else:', # there is a bug with cudf.where f' {var_name} = {inp}'] elif isinstance(t.op, Scalar): # for scalar inputs of other operands data = _interpret_var( statements = [f'{var_name} = {data}'] else: # pragma: no cover raise NotImplementedError(f'Does not support aggregating on {type(t.op)}') # append del statements for used inputs for key in keys_to_del: statements.append(f'del {local_key_to_var[key]}') local_lines.extend(statements) _calc_ref_counts(out_tileable) _gen_expr_str(out_tileable) args_str = ', '.join(input_key_to_var.values()) lines_str = '\n '.join(local_lines) return f"def expr_function({args_str}, gpu=False):\n" \ f" {lines_str}\n" \ f" return {local_key_to_var[out_tileable.key]}", \ list(input_key_to_var.keys()) def compile(self) -> ReductionSteps: pre_funcs, agg_funcs, post_funcs = [], [], [] referred_cols = set() for key, step in self._output_key_to_pre_steps.items(): cols = self._output_key_to_pre_cols[key] if cols: referred_cols.update(cols) pre_funcs.append(ReductionPreStep( step.input_key, step.output_key, cols, step.func)) for step in self._output_key_to_agg_steps.values(): agg_funcs.append(step) for key, step in self._output_key_to_post_steps.items(): cols = self._output_key_to_post_cols[key] if cols and set(cols) == set(referred_cols): post_cols = None else: post_cols = cols post_funcs.append(ReductionPostStep( step.input_keys, step.output_key, step.func_name, post_cols, step.func)) return ReductionSteps(pre_funcs, agg_funcs, post_funcs)