# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from collections import OrderedDict
from typing import Any, Callable, Dict, List, NamedTuple, Optional
import numpy as np
import pandas as pd
from ...core import (
OutputType,
ENTITY_TYPE,
is_build_mode,
is_kernel_mode,
enter_mode,
recursive_tile,
)
from ...core.operand import OperandStage
from ...utils import tokenize
from ...serialization.serializables import (
BoolField,
AnyField,
DataTypeField,
Int32Field,
StringField,
)
from ..core import SERIES_TYPE
from ..utils import (
parse_index,
build_df,
build_empty_df,
build_series,
build_empty_series,
validate_axis,
)
from ..operands import DataFrameOperandMixin, DataFrameOperand, DATAFRAME_TYPE
class DataFrameReductionOperand(DataFrameOperand):
_axis = AnyField("axis")
_skipna = BoolField("skipna")
_level = AnyField("level")
_numeric_only = BoolField("numeric_only")
_bool_only = BoolField("bool_only")
_min_count = Int32Field("min_count")
_use_inf_as_na = BoolField("use_inf_as_na")
_method = StringField("method")
_dtype = DataTypeField("dtype")
_combine_size = Int32Field("combine_size")
def __init__(
self,
axis=None,
skipna=None,
level=None,
numeric_only=None,
bool_only=None,
min_count=None,
dtype=None,
combine_size=None,
gpu=None,
sparse=None,
output_types=None,
use_inf_as_na=None,
method=None,
**kw,
):
super().__init__(
_axis=axis,
_skipna=skipna,
_level=level,
_numeric_only=numeric_only,
_bool_only=bool_only,
_min_count=min_count,
_dtype=dtype,
_combine_size=combine_size,
gpu=gpu,
sparse=sparse,
_output_types=output_types,
_use_inf_as_na=use_inf_as_na,
_method=method,
**kw,
)
@property
def axis(self):
return self._axis
@property
def skipna(self):
return self._skipna
@property
def level(self):
return self._level
@property
def numeric_only(self):
return self._numeric_only
@property
def bool_only(self):
return self._bool_only
@property
def min_count(self):
return self._min_count
@property
def dtype(self):
return self._dtype
@property
def combine_size(self):
return self._combine_size
@property
def use_inf_as_na(self):
return self._use_inf_as_na
@property
def is_atomic(self):
return False
@property
def method(self):
return self._method
def get_reduction_args(self, axis=None):
args = dict(skipna=self.skipna)
if self.inputs[0].ndim > 1:
args["axis"] = axis
if self.numeric_only is not None:
args["numeric_only"] = self.numeric_only
if self.bool_only is not None:
args["bool_only"] = self.bool_only
return {k: v for k, v in args.items() if v is not None}
class DataFrameCumReductionOperand(DataFrameOperand):
_axis = AnyField("axis")
_skipna = BoolField("skipna")
_use_inf_as_na = BoolField("use_inf_as_na")
_dtype = DataTypeField("dtype")
def __init__(
self,
axis=None,
skipna=None,
dtype=None,
gpu=None,
sparse=None,
output_types=None,
use_inf_as_na=None,
**kw,
):
super().__init__(
_axis=axis,
_skipna=skipna,
_dtype=dtype,
gpu=gpu,
sparse=sparse,
_output_types=output_types,
_use_inf_as_na=use_inf_as_na,
**kw,
)
@property
def axis(self):
return self._axis
@property
def skipna(self):
return self._skipna
@property
def dtype(self):
return self._dtype
@property
def use_inf_as_na(self):
return self._use_inf_as_na
def _default_agg_fun(value, func_name=None, **kw):
if value.ndim == 1:
kw.pop("bool_only", None)
kw.pop("numeric_only", None)
return getattr(value, func_name)(**kw)
else:
return getattr(value, func_name)(**kw)
class DataFrameReductionMixin(DataFrameOperandMixin):
@classmethod
def get_reduction_callable(cls, op):
func_name = getattr(op, "_func_name")
kw = dict(
skipna=op.skipna, numeric_only=op.numeric_only, bool_only=op.bool_only
)
kw = {k: v for k, v in kw.items() if v is not None}
fun = functools.partial(_default_agg_fun, func_name=func_name, **kw)
fun.__name__ = func_name
return fun
@classmethod
def tile(cls, op):
in_df = op.inputs[0]
out_df = op.outputs[0]
if isinstance(out_df, SERIES_TYPE):
output_type = OutputType.series
dtypes = pd.Series([out_df.dtype], index=[out_df.name])
index = out_df.index_value.to_pandas()
elif out_df.ndim == 1:
output_type = OutputType.tensor
dtypes, index = out_df.dtype, None
else:
output_type = OutputType.scalar
dtypes, index = out_df.dtype, None
out_df = yield from recursive_tile(
in_df.agg(
cls.get_reduction_callable(op),
axis=op.axis or 0,
_numeric_only=op.numeric_only,
_bool_only=op.bool_only,
_combine_size=op.combine_size,
_output_type=output_type,
_dtypes=dtypes,
_index=index,
)
)
return [out_df]
def _call_groupby_level(self, df, level):
return df.groupby(level=level).agg(
self.get_reduction_callable(self), method=self.method
)
def _call_dataframe(self, df):
axis = getattr(self, "axis", None) or 0
level = getattr(self, "level", None)
skipna = getattr(self, "skipna", None)
numeric_only = getattr(self, "numeric_only", None)
bool_only = getattr(self, "bool_only", None)
self._axis = axis = validate_axis(axis, df)
func_name = getattr(self, "_func_name")
if level is not None and axis == 1:
raise NotImplementedError("Not support specify level for axis==1")
empty_df = build_df(df, ensure_string=True)
if func_name == "count":
reduced_df = getattr(empty_df, func_name)(
axis=axis, level=level, numeric_only=numeric_only
)
elif func_name == "nunique":
reduced_df = getattr(empty_df, func_name)(axis=axis)
elif func_name in ("all", "any"):
reduced_df = getattr(empty_df, func_name)(
axis=axis, level=level, bool_only=bool_only
)
elif func_name == "size":
reduced_df = pd.Series(
np.zeros(df.shape[1 - axis]),
index=empty_df.columns if axis == 0 else None,
)
elif func_name == "custom_reduction":
reduced_df = getattr(self, "custom_reduction").__call_agg__(empty_df)
elif func_name == "str_concat":
reduced_df = empty_df.apply(
lambda s: s.str.cat(**getattr(self, "get_reduction_args")()), axis=axis
)
else:
reduced_df = getattr(empty_df, func_name)(
axis=axis, level=level, skipna=skipna, numeric_only=numeric_only
)
if level is not None:
return self._call_groupby_level(df[list(reduced_df.columns)], level)
reduced_shape = (df.shape[0],) if axis == 1 else reduced_df.shape
index_value = (
parse_index(reduced_df.index, store_data=True)
if axis == 0
else parse_index(pd.RangeIndex(-1))
)
return self.new_series(
[df], shape=reduced_shape, dtype=reduced_df.dtype, index_value=index_value
)
def _call_series(self, series):
level = getattr(self, "level", None)
axis = getattr(self, "axis", None)
skipna = getattr(self, "skipna", None)
numeric_only = getattr(self, "numeric_only", None)
bool_only = getattr(self, "bool_only", None)
self._axis = axis = validate_axis(axis or 0, series)
func_name = getattr(self, "_func_name")
if level is not None:
return self._call_groupby_level(series, level)
empty_series = build_series(series, ensure_string=True)
if func_name == "count":
reduced_series = empty_series.count(level=level)
elif func_name == "nunique":
reduced_series = empty_series.nunique()
elif func_name in ("all", "any"):
reduced_series = getattr(empty_series, func_name)(
axis=axis, level=level, bool_only=bool_only
)
elif func_name == "size":
reduced_series = empty_series.size
elif func_name == "custom_reduction":
reduced_series = getattr(self, "custom_reduction").__call_agg__(
empty_series
)
elif func_name == "str_concat":
reduced_series = pd.Series(
[empty_series.str.cat(**getattr(self, "get_reduction_args")())]
)
else:
reduced_series = getattr(empty_series, func_name)(
axis=axis, level=level, skipna=skipna, numeric_only=numeric_only
)
return self.new_scalar([series], dtype=np.array(reduced_series).dtype)
def __call__(self, a):
if is_kernel_mode() and not getattr(self, "is_atomic", False):
return self.get_reduction_callable(self)(a)
if isinstance(a, DATAFRAME_TYPE):
return self._call_dataframe(a)
else:
return self._call_series(a)
class DataFrameCumReductionMixin(DataFrameOperandMixin):
@classmethod
def _tile_one_chunk(cls, op):
df = op.outputs[0]
params = df.params.copy()
chk = op.inputs[0].chunks[0]
chunk_params = {k: v for k, v in chk.params.items() if k in df.params}
chunk_params["shape"] = df.shape
chunk_params["index"] = chk.index
new_chunk_op = op.copy().reset_key()
chunk = new_chunk_op.new_chunk(op.inputs[0].chunks, kws=[chunk_params])
new_op = op.copy()
nsplits = tuple((s,) for s in chunk.shape)
params["chunks"] = [chunk]
params["nsplits"] = nsplits
return new_op.new_tileables(op.inputs, kws=[params])
@classmethod
def _build_combine(cls, op, input_chunks, summary_chunks, idx):
c = input_chunks[idx]
to_concat_chunks = [c]
for j in range(idx):
to_concat_chunks.append(summary_chunks[j])
new_chunk_op = op.copy().reset_key()
new_chunk_op.stage = OperandStage.combine
return new_chunk_op.new_chunk(to_concat_chunks, **c.params)
@classmethod
def _tile_dataframe(cls, op):
in_df = op.inputs[0]
df = op.outputs[0]
n_rows, n_cols = in_df.chunk_shape
# map to get individual results and summaries
src_chunks = np.empty(in_df.chunk_shape, dtype=np.object)
summary_chunks = np.empty(in_df.chunk_shape, dtype=np.object)
for c in in_df.chunks:
new_chunk_op = op.copy().reset_key()
new_chunk_op.stage = OperandStage.map
if op.axis == 1:
summary_shape = (c.shape[0], 1)
else:
summary_shape = (1, c.shape[1])
src_chunks[c.index] = c
summary_chunks[c.index] = new_chunk_op.new_chunk(
[c], shape=summary_shape, dtypes=df.dtypes
)
# combine summaries into results
output_chunk_array = np.empty(in_df.chunk_shape, dtype=np.object)
if op.axis == 1:
for row in range(n_rows):
row_src = src_chunks[row, :]
row_summaries = summary_chunks[row, :]
for col in range(n_cols):
output_chunk_array[row, col] = cls._build_combine(
op, row_src, row_summaries, col
)
else:
for col in range(n_cols):
col_src = src_chunks[:, col]
col_summaries = summary_chunks[:, col]
for row in range(n_rows):
output_chunk_array[row, col] = cls._build_combine(
op, col_src, col_summaries, row
)
output_chunks = list(output_chunk_array.reshape((n_rows * n_cols,)))
new_op = op.copy().reset_key()
return new_op.new_tileables(
op.inputs,
shape=in_df.shape,
nsplits=in_df.nsplits,
chunks=output_chunks,
dtypes=df.dtypes,
index_value=df.index_value,
columns_value=df.columns_value,
)
@classmethod
def _tile_series(cls, op):
in_series = op.inputs[0]
series = op.outputs[0]
# map to get individual results and summaries
summary_chunks = np.empty(in_series.chunk_shape, dtype=np.object)
for c in in_series.chunks:
new_chunk_op = op.copy().reset_key()
new_chunk_op.stage = OperandStage.map
summary_chunks[c.index] = new_chunk_op.new_chunk(
[c], shape=(1,), dtype=series.dtype
)
# combine summaries into results
output_chunks = [
cls._build_combine(op, in_series.chunks, summary_chunks, i)
for i in range(len(in_series.chunks))
]
new_op = op.copy().reset_key()
return new_op.new_tileables(
op.inputs,
shape=in_series.shape,
nsplits=in_series.nsplits,
chunks=output_chunks,
dtype=series.dtype,
index_value=series.index_value,
name=series.name,
)
@classmethod
def tile(cls, op):
in_df = op.inputs[0]
if len(in_df.chunks) == 1:
return cls._tile_one_chunk(op)
if isinstance(in_df, DATAFRAME_TYPE):
return cls._tile_dataframe(op)
else:
return cls._tile_series(op)
@staticmethod
def _get_last_slice(op, df, start):
if op.output_types[0] == OutputType.series:
return df.iloc[start:]
else:
if op.axis == 1:
return df.iloc[:, start:]
else:
return df.iloc[start:, :]
@classmethod
def _execute_map(cls, ctx, op):
in_data = ctx[op.inputs[0].key]
kwargs = dict()
if op.axis is not None:
kwargs["axis"] = op.axis
if op.skipna is not None:
kwargs["skipna"] = op.skipna
partial = getattr(in_data, getattr(cls, "_func_name"))(**kwargs)
if op.skipna:
partial.fillna(method="ffill", axis=op.axis, inplace=True)
ctx[op.outputs[0].key] = cls._get_last_slice(op, partial, -1)
@classmethod
def _execute_combine(cls, ctx, op):
kwargs = dict()
if op.axis is not None:
kwargs["axis"] = op.axis
if op.skipna is not None:
kwargs["skipna"] = op.skipna
if len(op.inputs) > 1:
ref_datas = [ctx[inp.key] for inp in op.inputs[1:]]
concat_df = getattr(
pd.concat(ref_datas, axis=op.axis), getattr(cls, "_func_name")
)(**kwargs)
if op.skipna:
concat_df.fillna(method="ffill", axis=op.axis, inplace=True)
in_data = ctx[op.inputs[0].key]
concat_df = pd.concat(
[cls._get_last_slice(op, concat_df, -1), in_data], axis=op.axis
)
result = getattr(concat_df, getattr(cls, "_func_name"))(**kwargs)
ctx[op.outputs[0].key] = cls._get_last_slice(op, result, 1)
else:
ctx[op.outputs[0].key] = getattr(
ctx[op.inputs[0].key], getattr(cls, "_func_name")
)(**kwargs)
@classmethod
def execute(cls, ctx, op):
try:
pd.set_option("mode.use_inf_as_na", op.use_inf_as_na)
if op.stage == OperandStage.map:
return cls._execute_map(ctx, op)
else:
return cls._execute_combine(ctx, op)
finally:
pd.reset_option("mode.use_inf_as_na")
def _call_dataframe(self, df):
axis = getattr(self, "axis", None) or 0
self._axis = axis = validate_axis(axis, df)
empty_df = build_empty_df(df.dtypes)
reduced_df = getattr(empty_df, getattr(self, "_func_name"))(axis=axis)
return self.new_dataframe(
[df],
shape=df.shape,
dtypes=reduced_df.dtypes,
index_value=df.index_value,
columns_value=df.columns_value,
)
def _call_series(self, series):
axis = getattr(self, "axis", None) or 0
if axis == "index":
axis = 0
self._axis = axis
return self.new_series(
[series],
shape=series.shape,
dtype=series.dtype,
name=series.name,
index_value=series.index_value,
)
def __call__(self, a):
if isinstance(a, DATAFRAME_TYPE):
return self._call_dataframe(a)
else:
return self._call_series(a)
[docs]class CustomReduction:
name: Optional[str]
output_limit: Optional[int]
kwds: Dict
# set to True when pre() already performs aggregation
pre_with_agg = False
[docs] def __init__(self, name=None, is_gpu=False):
self.name = name or "<custom>"
self.output_limit = 1
self._is_gpu = is_gpu
@property
def __name__(self):
return self.name
def __call__(self, value):
if is_build_mode():
from .custom_reduction import build_custom_reduction_result
return build_custom_reduction_result(value, self)
return self.__call_agg__(value)
def __call_agg__(self, value):
r = self.pre(value)
if not isinstance(r, tuple):
r = (r,)
# update output limit into actual size
self.output_limit = len(r)
# only perform aggregation when pre() does not perform aggregation
if not self.pre_with_agg:
r = self.agg(*r)
if not isinstance(r, tuple):
r = (r,)
r = self.post(*r)
return r
def is_gpu(self):
return self._is_gpu if not is_build_mode() else False
def pre(self, value): # noqa: R0201 # pylint: disable=no-self-use
return (value,)
def agg(self, *values): # noqa: R0201 # pylint: disable=no-self-use
raise NotImplementedError
def post(self, *value): # noqa: R0201 # pylint: disable=no-self-use
assert len(value) == 1
return value[0]
def __mars_tokenize__(self):
import cloudpickle
return cloudpickle.dumps(self)
class ReductionPreStep(NamedTuple):
input_key: str
output_key: str
columns: Optional[List[str]]
func: Callable
class ReductionAggStep(NamedTuple):
input_key: str
map_func_name: Optional[str]
agg_func_name: Optional[str]
custom_reduction: Optional[CustomReduction]
output_key: str
output_limit: int
kwds: Dict[str, Any]
class ReductionPostStep(NamedTuple):
input_keys: List[str]
output_key: str
func_name: str
columns: Optional[List[str]]
func: Callable
class ReductionSteps(NamedTuple):
pre_funcs: List[ReductionPreStep]
agg_funcs: List[ReductionAggStep]
post_funcs: List[ReductionPostStep]
# lookup table for numpy arithmetic operands in pandas
_func_name_converts = dict(
greater="gt",
greater_equal="ge",
less="lt",
less_equal="le",
equal="eq",
not_equal="ne",
true_divide="truediv",
floor_divide="floordiv",
power="pow",
)
_func_name_to_op = dict(
greater=">",
gt=">",
greater_equal=">=",
ge=">",
less="<",
lt="<",
less_equal="<=",
le="<=",
equal="==",
eq="==",
not_equal="!=",
ne="!=",
bitwise_and="&",
__and__="&",
bitwise_or="|",
__or__="|",
bitwise_xor="^",
__xor__="^",
add="+",
subtract="-",
sub="-",
multiply="*",
mul="*",
true_divide="/",
truediv="/",
floor_divide="//",
floordiv="//",
power="**",
pow="**",
mod="%",
)
_func_compile_cache = dict() # type: Dict[str, ReductionSteps]
class ReductionCompiler:
def __init__(self, axis=0, store_source=False):
self._axis = axis
self._store_source = store_source
self._key_to_tileable = dict()
self._output_tileables = []
self._lambda_counter = 0
self._custom_counter = 0
self._func_cache = dict()
self._compiled_funcs = []
self._output_key_to_pre_steps = dict()
self._output_key_to_pre_cols = dict()
self._output_key_to_agg_steps = dict()
self._output_key_to_post_steps = dict()
self._output_key_to_post_cols = dict()
@classmethod
def _check_function_valid(cls, func):
if isinstance(func, functools.partial):
return cls._check_function_valid(func.func)
elif isinstance(func, CustomReduction):
return
func_code = func.__code__
func_vars = {n: func.__globals__.get(n) for n in func_code.co_names}
if func.__closure__:
func_vars.update(
{
n: cell.cell_contents
for n, cell in zip(func_code.co_freevars, func.__closure__)
}
)
# external Mars objects shall not be referenced
for var_name, val in func_vars.items():
if isinstance(val, ENTITY_TYPE):
raise ValueError(
f"Variable {var_name} used by {func.__name__} "
"cannot be a Mars object"
)
@staticmethod
def _update_col_dict(col_dict: Dict, key: str, cols: List):
if key in col_dict:
existing_cols = col_dict[key]
if existing_cols is not None:
existing_col_set = set(existing_cols)
col_dict[key].extend([c for c in cols if c not in existing_col_set])
else:
col_dict[key] = list(cols) if cols is not None else None
def add_function(self, func, ndim, cols=None, func_name=None):
cols = cols if cols is not None and self._axis == 0 else None
func_name = func_name or getattr(func, "__name__", None)
if func_name == "<lambda>" or func_name is None:
func_name = f"<lambda_{self._lambda_counter}>"
self._lambda_counter += 1
if func_name == "<custom>" or func_name is None:
func_name = f"<custom_{self._custom_counter}>"
self._custom_counter += 1
compile_result = self._compile_function(func, func_name, ndim=ndim)
self._compiled_funcs.append(compile_result)
for step in compile_result.pre_funcs:
self._output_key_to_pre_steps[step.output_key] = step
self._update_col_dict(self._output_key_to_pre_cols, step.output_key, cols)
for step in compile_result.agg_funcs:
self._output_key_to_agg_steps[step.output_key] = step
for step in compile_result.post_funcs:
self._output_key_to_post_steps[step.output_key] = step
self._update_col_dict(self._output_key_to_post_cols, step.output_key, cols)
@functools.lru_cache(100)
def _compile_expr_function(self, py_src):
from ... import tensor, dataframe
result_store = dict()
global_vars = globals()
global_vars.update(dict(mt=tensor, md=dataframe, array=np.array, nan=np.nan))
exec(
py_src, global_vars, result_store
) # noqa: W0122 # nosec # pylint: disable=exec-used
fun = result_store["expr_function"]
if self._store_source:
fun.__source__ = py_src
return fun
@staticmethod
def _build_mock_return_object(func, input_dtype, ndim):
from ..initializer import DataFrame as MarsDataFrame, Series as MarsSeries
if ndim == 1:
mock_series = build_empty_series(np.dtype(input_dtype))
mock_obj = MarsSeries(mock_series)
else:
mock_df = build_empty_df(
pd.Series([np.dtype(input_dtype)] * 2, index=["A", "B"])
)
mock_obj = MarsDataFrame(mock_df)
# calc target tileable to generate DAG
with enter_mode(kernel=True):
return func(mock_obj)
@enter_mode(build=True)
def _compile_function(self, func, func_name=None, ndim=1) -> ReductionSteps:
from ...tensor.arithmetic.core import TensorBinOp, TensorUnaryOp
from ...tensor.base import TensorWhere
from ..arithmetic.core import DataFrameBinOp, DataFrameUnaryOp
from ..datasource.dataframe import DataFrameDataSource
from ..indexing.where import DataFrameWhere
from ..datasource.series import SeriesDataSource
func_token = tokenize(func, self._axis, func_name, ndim)
if func_token in _func_compile_cache:
return _func_compile_cache[func_token]
custom_reduction = func if isinstance(func, CustomReduction) else None
self._check_function_valid(func)
try:
func_ret = self._build_mock_return_object(func, float, ndim=ndim)
except (TypeError, AttributeError):
# we may encounter lambda x: x.str.cat(...), use an object series to test
func_ret = self._build_mock_return_object(func, object, ndim=1)
output_limit = getattr(func, "output_limit", None) or 1
if not isinstance(func_ret, ENTITY_TYPE):
raise ValueError(
f"Custom function should return a Mars object, not {type(func_ret)}"
)
if func_ret.ndim >= ndim:
raise ValueError("Function not a reduction")
agg_graph = func_ret.build_graph()
agg_tileables = set(t for t in agg_graph if getattr(t.op, "is_atomic", False))
# check operands before aggregation
for t in agg_graph.dfs(
list(agg_tileables), visit_predicate="all", reverse=True
):
if t not in agg_tileables and not isinstance(
t.op,
(
DataFrameUnaryOp,
DataFrameBinOp,
TensorUnaryOp,
TensorBinOp,
TensorWhere,
DataFrameWhere,
DataFrameDataSource,
SeriesDataSource,
),
):
raise ValueError(f"Cannot support operand {type(t.op)} in aggregation")
# check operands after aggregation
for t in agg_graph.dfs(list(agg_tileables), visit_predicate="all"):
if t not in agg_tileables and not isinstance(
t.op,
(
DataFrameUnaryOp,
DataFrameBinOp,
TensorWhere,
DataFrameWhere,
TensorUnaryOp,
TensorBinOp,
),
):
raise ValueError(f"Cannot support operand {type(t.op)} in aggregation")
pre_funcs, agg_funcs, post_funcs = [], [], []
visited_inputs = set()
# collect aggregations and their inputs
for t in agg_tileables:
agg_input_key = t.inputs[0].key
# collect agg names
step_func_name = getattr(t.op, "_func_name")
if step_func_name in ("count", "size"):
map_func_name, agg_func_name = step_func_name, "sum"
else:
map_func_name, agg_func_name = step_func_name, step_func_name
# build agg description
agg_funcs.append(
ReductionAggStep(
agg_input_key,
map_func_name,
agg_func_name,
custom_reduction,
t.key,
output_limit,
t.op.get_reduction_args(axis=self._axis),
)
)
# collect agg input and build function
if agg_input_key not in visited_inputs:
visited_inputs.add(agg_input_key)
initial_inputs = list(t.inputs[0].build_graph().iter_indep())
assert len(initial_inputs) == 1
input_key = initial_inputs[0].key
func_str, _ = self._generate_function_str(t.inputs[0])
pre_funcs.append(
ReductionPreStep(
input_key,
agg_input_key,
None,
self._compile_expr_function(func_str),
)
)
# collect function output after agg
func_str, input_keys = self._generate_function_str(func_ret)
post_funcs.append(
ReductionPostStep(
input_keys,
func_ret.key,
func_name,
None,
self._compile_expr_function(func_str),
)
)
if len(_func_compile_cache) > 100: # pragma: no cover
_func_compile_cache.pop(next(iter(_func_compile_cache.keys())))
result = _func_compile_cache[func_token] = ReductionSteps(
pre_funcs, agg_funcs, post_funcs
)
return result
def _generate_function_str(self, out_tileable):
"""
Generate python code from tileable DAG
"""
from ...tensor.arithmetic.core import TensorBinOp, TensorUnaryOp
from ...tensor.base import TensorWhere
from ...tensor.datasource import Scalar
from ..arithmetic.core import (
DataFrameBinOp,
DataFrameUnaryOp,
DataFrameUnaryUfunc,
)
from ..datasource.dataframe import DataFrameDataSource
from ..datasource.series import SeriesDataSource
from ..indexing.where import DataFrameWhere
input_key_to_var = OrderedDict()
local_key_to_var = dict()
ref_counts = dict()
ref_visited = set()
local_lines = []
input_op_types = (
DataFrameDataSource,
SeriesDataSource,
DataFrameReductionOperand,
)
def _calc_ref_counts(t):
# calculate object refcount for t, this reduces memory usage in functions
if t.key in ref_visited:
return
ref_visited.add(t.key)
for inp in t.inputs:
_calc_ref_counts(inp)
if not isinstance(inp.op, input_op_types):
if inp.key not in ref_counts:
ref_counts[inp.key] = 0
ref_counts[inp.key] += 1
def _gen_expr_str(t):
# generate code for t
if t.key in local_key_to_var:
return
if isinstance(t.op, input_op_types):
# tileable is an input arg, build a function variable
if t.key not in input_key_to_var: # pragma: no branch
input_key_to_var[t.key] = local_key_to_var[
t.key
] = f"invar{len(input_key_to_var)}"
else:
keys_to_del = []
for inp in t.inputs:
_gen_expr_str(inp)
if inp.key in ref_counts:
ref_counts[inp.key] -= 1
if ref_counts[inp.key] == 0:
# the input is no longer referenced, a del statement will be produced
keys_to_del.append(inp.key)
var_name = local_key_to_var[t.key] = f"var{len(local_key_to_var)}"
keys_to_vars = {inp.key: local_key_to_var[inp.key] for inp in t.inputs}
def _interpret_var(v):
# get representation for variables
if hasattr(v, "key"):
return keys_to_vars[v.key]
return v
func_name = func_name_raw = getattr(t.op, "_func_name", None)
rfunc_name = getattr(t.op, "_rfunc_name", func_name)
if func_name is None:
func_name = func_name_raw = getattr(t.op, "_bit_func_name", None)
rfunc_name = getattr(t.op, "_bit_rfunc_name", func_name)
# handle function name differences between numpy and pandas arithmetic ops
if func_name in _func_name_converts:
func_name = _func_name_converts[func_name]
if rfunc_name in _func_name_converts:
rfunc_name = "r" + _func_name_converts[rfunc_name]
# build given different op types
if isinstance(t.op, (DataFrameUnaryOp, TensorUnaryOp)):
val = _interpret_var(t.inputs[0])
if isinstance(t.op, DataFrameUnaryUfunc):
statements = [f"{var_name} = np.{func_name_raw}({val})"]
else:
statements = [
f"try:",
f" {var_name} = {val}.{func_name}()",
f"except AttributeError:",
f" {var_name} = np.{func_name_raw}({val})",
]
elif isinstance(t.op, (DataFrameBinOp, TensorBinOp)):
lhs, rhs = t.op.lhs, t.op.rhs
op_axis = (
1 - self._axis
if hasattr(lhs, "ndim")
and hasattr(rhs, "ndim")
and lhs.ndim != rhs.ndim
else None
)
lhs = _interpret_var(lhs)
rhs = _interpret_var(rhs)
axis_expr = f"axis={op_axis!r}, " if op_axis is not None else ""
op_str = _func_name_to_op[func_name]
if t.op.lhs is t.inputs[0]:
statements = [
f"try:",
f" {var_name} = {lhs}.{func_name}({rhs}, {axis_expr})",
f"except AttributeError:",
f" {var_name} = {lhs} {op_str} {rhs}",
]
else:
statements = [
f"try:",
f" {var_name} = {rhs}.{rfunc_name}({lhs}, {axis_expr})",
f"except AttributeError:",
f" {var_name} = {rhs} {op_str} {lhs}",
]
elif isinstance(t.op, TensorWhere):
cond = _interpret_var(t.op.condition)
x = _interpret_var(t.op.x)
y = _interpret_var(t.op.y)
statements = [
f"if not gpu:",
f" {var_name} = np.where({cond}, {x}, {y})",
f"else:", # there is a bug with cudf.where
f" {var_name} = {x}",
]
elif isinstance(t.op, DataFrameWhere):
func_name = "mask" if t.op.replace_true else "where"
inp = _interpret_var(t.op.input)
cond = _interpret_var(t.op.cond)
other = _interpret_var(t.op.other)
statements = [
f"if not gpu:",
f" {var_name} = {inp}.{func_name}({cond}, {other}, "
f"axis={t.op.axis!r}, level={t.op.level!r})",
f"else:", # there is a bug with cudf.where
f" {var_name} = {inp}",
]
elif isinstance(t.op, Scalar):
# for scalar inputs of other operands
data = _interpret_var(t.op.data)
statements = [f"{var_name} = {data}"]
else: # pragma: no cover
raise NotImplementedError(
f"Does not support aggregating on {type(t.op)}"
)
# append del statements for used inputs
for key in keys_to_del:
statements.append(f"del {local_key_to_var[key]}")
local_lines.extend(statements)
_calc_ref_counts(out_tileable)
_gen_expr_str(out_tileable)
args_str = ", ".join(input_key_to_var.values())
lines_str = "\n ".join(local_lines)
return (
f"def expr_function({args_str}, gpu=False):\n"
f" {lines_str}\n"
f" return {local_key_to_var[out_tileable.key]}",
list(input_key_to_var.keys()),
)
def compile(self) -> ReductionSteps:
pre_funcs, agg_funcs, post_funcs = [], [], []
referred_cols = set()
for key, step in self._output_key_to_pre_steps.items():
cols = self._output_key_to_pre_cols[key]
if cols:
referred_cols.update(cols)
pre_funcs.append(
ReductionPreStep(step.input_key, step.output_key, cols, step.func)
)
for step in self._output_key_to_agg_steps.values():
agg_funcs.append(step)
for key, step in self._output_key_to_post_steps.items():
cols = self._output_key_to_post_cols[key]
if cols and set(cols) == set(referred_cols):
post_cols = None
else:
post_cols = cols
post_funcs.append(
ReductionPostStep(
step.input_keys,
step.output_key,
step.func_name,
post_cols,
step.func,
)
)
return ReductionSteps(pre_funcs, agg_funcs, post_funcs)