# Copyright 1999-2020 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd import numpy as np from ... import opcodes as OperandDef from ...core import Base, Entity, OutputType from ...serialize import ValueType, ListField, StringField, BoolField, AnyField from ...tiles import TilesError from ...utils import lazy_import, check_chunks_unknown_shape from ..utils import parse_index, build_empty_df, build_empty_series, \ standardize_range_index, validate_axis from ..operands import DataFrameOperand, DataFrameOperandMixin, SERIES_TYPE cudf = lazy_import('cudf', globals=globals()) class DataFrameConcat(DataFrameOperand, DataFrameOperandMixin): _op_type_ = OperandDef.CONCATENATE _axis = AnyField('axis') _join = StringField('join') _ignore_index = BoolField('ignore_index') _keys = ListField('keys') _levels = ListField('levels') _names = ListField('names') _verify_integrity = BoolField('verify_integrity') _sort = BoolField('sort') _copy = BoolField('copy') def __init__(self, axis=None, join=None, ignore_index=None, keys=None, levels=None, names=None, verify_integrity=None, sort=None, copy=None, sparse=None, output_types=None, **kw): super().__init__( _axis=axis, _join=join, _ignore_index=ignore_index, _keys=keys, _levels=levels, _names=names, _verify_integrity=verify_integrity, _sort=sort, _copy=copy, _sparse=sparse, _output_types=output_types, **kw) @property def axis(self): return self._axis @property def join(self): return self._join @property def ignore_index(self): return self._ignore_index @property def keys(self): return self._keys @property def level(self): return self._levels @property def name(self): return self._names @property def verify_integrity(self): return self._verify_integrity @property def sort(self): return self._sort @property def copy_(self): return self._copy @classmethod def _tile_dataframe(cls, op): from ..indexing.iloc import DataFrameIlocGetItem out_df = op.outputs[0] inputs = op.inputs check_chunks_unknown_shape(inputs, TilesError) normalized_nsplits = {1: inputs[0].nsplits[1]} if op.axis == 0 else {0: inputs[0].nsplits[0]} inputs = [item.rechunk(normalized_nsplits)._inplace_tile() for item in inputs] out_chunks = [] nsplits = [] cum_index = 0 for df in inputs: for c in df.chunks: if op.axis == 0: index = (c.index[0] + cum_index, c.index[1]) else: index = (c.index[0], c.index[1] + cum_index) iloc_op = DataFrameIlocGetItem(indexes=(slice(None),) * 2) out_chunks.append(iloc_op.new_chunk([c], shape=c.shape, index=index, dtypes=c.dtypes, index_value=c.index_value, columns_value=c.columns_value)) nsplits.extend(df.nsplits[op.axis]) cum_index += len(df.nsplits[op.axis]) out_nsplits = (tuple(nsplits), inputs[0].nsplits[1]) \ if op.axis == 0 else (inputs[0].nsplits[0], tuple(nsplits)) if op.ignore_index: out_chunks = standardize_range_index(out_chunks) new_op = op.copy() return new_op.new_dataframes(op.inputs, out_df.shape, nsplits=out_nsplits, chunks=out_chunks, dtypes=out_df.dtypes, index_value=out_df.index_value, columns_value=out_df.columns_value) @classmethod def _tile_series(cls, op): from ..indexing.iloc import SeriesIlocGetItem out = op.outputs[0] inputs = op.inputs out_chunks = [] if op.axis == 1: check_chunks_unknown_shape(inputs, TilesError) inputs = [item.rechunk(op.inputs[0].nsplits)._inplace_tile() for item in inputs] cum_index = 0 nsplits = [] for series in inputs: for c in series.chunks: if op.axis == 0: index = (c.index[0] + cum_index,) shape = c.shape else: index = (c.index[0], cum_index) shape = (c.shape[0], 1) iloc_op = SeriesIlocGetItem(indexes=(slice(None),)) out_chunks.append(iloc_op.new_chunk([c], shape=shape, index=index, index_value=c.index_value, dtype=c.dtype, name=c.name)) if op.axis == 0: nsplits.extend(series.nsplits[0]) cum_index += len(series.nsplits[op.axis]) else: nsplits.append(1) cum_index += 1 if op.ignore_index: out_chunks = standardize_range_index(out_chunks) new_op = op.copy() if op.axis == 0: nsplits = (tuple(nsplits),) return new_op.new_seriess(op.inputs, out.shape, nsplits=nsplits, chunks=out_chunks, dtype=out.dtype, index_value=out.index_value, name=out.name) else: nsplits = (inputs[0].nsplits[0], tuple(nsplits)) return new_op.new_dataframes(op.inputs, out.shape, nsplits=nsplits, chunks=out_chunks, dtypes=out.dtypes, index_value=out.index_value, columns_value=out.columns_value) @classmethod def tile(cls, op): if isinstance(op.inputs[0], SERIES_TYPE): return cls._tile_series(op) else: return cls._tile_dataframe(op) @classmethod def execute(cls, ctx, op): def _base_concat(chunk, inputs): # auto generated concat when executing a DataFrame, Series or Index if chunk.op.output_types[0] == OutputType.dataframe: return _auto_concat_dataframe_chunks(chunk, inputs) elif chunk.op.output_types[0] == OutputType.series: return _auto_concat_series_chunks(chunk, inputs) elif chunk.op.output_types[0] == OutputType.index: return _auto_concat_index_chunks(chunk, inputs) elif chunk.op.output_types[0] == OutputType.categorical: return _auto_concat_categorical_chunks(chunk, inputs) else: # pragma: no cover raise TypeError('Only DataFrameChunk, SeriesChunk, IndexChunk, ' 'and CategoricalChunk can be automatically concatenated') def _auto_concat_dataframe_chunks(chunk, inputs): xdf = pd if isinstance(inputs[0], (pd.DataFrame, pd.Series)) or cudf is None else cudf if chunk.op.axis is not None: return xdf.concat(inputs, axis=op.axis) # auto generated concat when executing a DataFrame if len(inputs) == 1: ret = inputs[0] else: n_rows = len(set(inp.index[0] for inp in chunk.inputs)) n_cols = int(len(inputs) // n_rows) assert n_rows * n_cols == len(inputs) concats = [] for i in range(n_rows): if n_cols == 1: concats.append(inputs[i]) else: concat = xdf.concat([inputs[i * n_cols + j] for j in range(n_cols)], axis=1) concats.append(concat) if xdf is pd: # The `sort=False` is to suppress a `FutureWarning` of pandas, # when the index or column of chunks to concatenate is not aligned, # which may happens for certain ops. # # See also Note [Columns of Left Join] in test_merge_execution.py. ret = xdf.concat(concats, sort=False) else: ret = xdf.concat(concats) # cuDF will lost index name when concat two seriess. ret.index.name = concats[0].index.name if getattr(chunk.index_value, 'should_be_monotonic', False): ret.sort_index(inplace=True) if getattr(chunk.columns_value, 'should_be_monotonic', False): ret.sort_index(axis=1, inplace=True) return ret def _auto_concat_series_chunks(chunk, inputs): # auto generated concat when executing a Series if len(inputs) == 1: concat = inputs[0] else: xdf = pd if isinstance(inputs[0], pd.Series) or cudf is None else cudf if chunk.op.axis is not None: concat = xdf.concat(inputs, axis=chunk.op.axis) else: concat = xdf.concat(inputs) if getattr(chunk.index_value, 'should_be_monotonic', False): concat.sort_index(inplace=True) return concat def _auto_concat_index_chunks(chunk, inputs): if len(inputs) == 1: xdf = pd if isinstance(inputs[0], pd.Index) or cudf is None else cudf concat_df = xdf.DataFrame(index=inputs[0]) else: xdf = pd if isinstance(inputs[0], pd.Index) or cudf is None else cudf empty_dfs = [xdf.DataFrame(index=inp) for inp in inputs] concat_df = xdf.concat(empty_dfs, axis=0) if getattr(chunk.index_value, 'should_be_monotonic', False): concat_df.sort_index(inplace=True) return concat_df.index def _auto_concat_categorical_chunks(_, inputs): if len(inputs) == 1: # pragma: no cover return inputs[0] else: # convert categorical into array arrays = [np.asarray(inp) for inp in inputs] array = np.concatenate(arrays) return pd.Categorical(array, categories=inputs[0].categories, ordered=inputs[0].ordered) chunk = op.outputs[0] inputs = [ctx[input.key] for input in op.inputs] if isinstance(inputs[0], tuple): ctx[chunk.key] = tuple(_base_concat(chunk, [input[i] for input in inputs]) for i in range(len(inputs[0]))) else: ctx[chunk.key] = _base_concat(chunk, inputs) @classmethod def _concat_index(cls, prev_index: pd.Index, cur_index: pd.Index): if isinstance(prev_index, pd.RangeIndex) and \ isinstance(cur_index, pd.RangeIndex): # handle RangeIndex that append may generate huge amount of data # e.g. pd.RangeIndex(10_000) and pd.RangeIndex(10_000) # will generate a Int64Index full of data # for details see GH#1647 prev_stop = prev_index.start + prev_index.size * prev_index.step cur_start = cur_index.start if prev_stop == cur_start and prev_index.step == cur_index.step: # continuous RangeIndex, still return RangeIndex return prev_index.append(cur_index) else: # otherwise, return an empty index return pd.Index([], dtype=prev_index.dtype) elif isinstance(prev_index, pd.RangeIndex): return pd.Index([], prev_index.dtype).append(cur_index) elif isinstance(cur_index, pd.RangeIndex): return prev_index.append(pd.Index([], cur_index.dtype)) return prev_index.append(cur_index) def _call_series(self, objs): if self.axis == 0: row_length = 0 index = None for series in objs: if index is None: index = series.index_value.to_pandas() else: index = self._concat_index(index, series.index_value.to_pandas()) row_length += series.shape[0] if self.ignore_index: # pragma: no cover index_value = parse_index(pd.RangeIndex(row_length)) else: index_value = parse_index(index, objs) return self.new_series(objs, shape=(row_length,), dtype=objs[0].dtype, index_value=index_value, name=objs[0].name) else: col_length = 0 columns = [] dtypes = dict() undefined_name = 0 for series in objs: if series.name is None: dtypes[undefined_name] = series.dtype undefined_name += 1 columns.append(undefined_name) else: dtypes[series.name] = series.dtype columns.append(series.name) col_length += 1 if self.ignore_index or undefined_name == len(objs): columns_value = parse_index(pd.RangeIndex(col_length)) else: columns_value = parse_index(pd.Index(columns), store_data=True) shape = (objs[0].shape[0], col_length) return self.new_dataframe(objs, shape=shape, dtypes=pd.Series(dtypes), index_value=objs[0].index_value, columns_value=columns_value) def _call_dataframes(self, objs): if self.axis == 0: row_length = 0 index = None empty_dfs = [] for df in objs: if index is None: index = df.index_value.to_pandas() else: index = self._concat_index(index, df.index_value.to_pandas()) row_length += df.shape[0] if df.ndim == 2: empty_dfs.append(build_empty_df(df.dtypes)) else: empty_dfs.append(build_empty_series(df.dtype, name=df.name)) emtpy_result = pd.concat(empty_dfs, join=self.join, sort=True) shape = (row_length, emtpy_result.shape[1]) columns_value = parse_index(emtpy_result.columns, store_data=True) if self.join == 'inner': objs = [o[list(emtpy_result.columns)] for o in objs] if self.ignore_index: # pragma: no cover index_value = parse_index(pd.RangeIndex(row_length)) else: index_value = parse_index(index, objs) new_objs = [] for obj in objs: if obj.ndim != 2: # series new_obj = obj.to_frame().reindex(columns=emtpy_result.dtypes.index) else: # dataframe if list(obj.dtypes.index) != list(emtpy_result.dtypes.index): new_obj = obj.reindex(columns=emtpy_result.dtypes.index) else: new_obj = obj new_objs.append(new_obj) return self.new_dataframe(new_objs, shape=shape, dtypes=emtpy_result.dtypes, index_value=index_value, columns_value=columns_value) else: col_length = 0 empty_dfs = [] for df in objs: if df.ndim == 2: # DataFrame col_length += df.shape[1] empty_dfs.append(build_empty_df(df.dtypes)) else: # Series col_length += 1 empty_dfs.append(build_empty_series(df.dtype, name=df.name)) emtpy_result = pd.concat(empty_dfs, join=self.join, axis=1, sort=True) if self.ignore_index: columns_value = parse_index(pd.RangeIndex(col_length)) else: columns_value = parse_index(pd.Index(emtpy_result.columns), store_data=True) if self.ignore_index or len({o.index_value.key for o in objs}) == 1: new_objs = [obj if obj.ndim == 2 else obj.to_frame() for obj in objs] else: # pragma: no cover raise NotImplementedError('Does not support concat dataframes ' 'which has different index') shape = (objs[0].shape[0], col_length) return self.new_dataframe(new_objs, shape=shape, dtypes=emtpy_result.dtypes, index_value=objs[0].index_value, columns_value=columns_value) def __call__(self, objs): if all(isinstance(obj, SERIES_TYPE) for obj in objs): self.output_types = [OutputType.series] return self._call_series(objs) else: self.output_types = [OutputType.dataframe] return self._call_dataframes(objs) class GroupByConcat(DataFrameOperand, DataFrameOperandMixin): _op_type_ = OperandDef.GROUPBY_CONCAT _groups = ListField('groups', ValueType.key) _groupby_params = AnyField('groupby_params') def __init__(self, groups=None, groupby_params=None, output_types=None, **kw): super().__init__(_groups=groups, _groupby_params=groupby_params, _output_types=output_types, **kw) @property def groups(self): return self._groups @property def groupby_params(self): return self._groupby_params def _set_inputs(self, inputs): super()._set_inputs(inputs) inputs_iter = iter(self._inputs) new_groups = [] for _ in self._groups: new_groups.append(next(inputs_iter)) self._groups = new_groups if isinstance(self._groupby_params['by'], list): by = [] for v in self._groupby_params['by']: if isinstance(v, (Base, Entity)): by.append(next(inputs_iter)) else: by.append(v) self._groupby_params['by'] = by @classmethod def execute(cls, ctx, op): input_data = [ctx[input.key] for input in op.groups] obj = pd.concat([d.obj for d in input_data]) params = op.groupby_params.copy() if isinstance(params['by'], list): by = [] for v in params['by']: if isinstance(v, Base): by.append(ctx[v.key]) else: by.append(v) params['by'] = by selection = params.pop('selection', None) result = obj.groupby(**params) if selection: result = result[selection] ctx[op.outputs[0].key] = result [docs]def concat(objs, axis=0, join='outer', ignore_index=False, keys=None, levels=None, names=None, verify_integrity=False, sort=False, copy=True): if not isinstance(objs, (list, tuple)): # pragma: no cover raise TypeError('first argument must be an iterable of dataframe or series objects') axis = validate_axis(axis) if isinstance(objs, dict): # pragma: no cover keys = objs.keys() objs = objs.values() if axis == 1 and join == 'inner': # pragma: no cover raise NotImplementedError('inner join is not support when specify `axis=1`') if verify_integrity or sort or keys: # pragma: no cover raise NotImplementedError('verify_integrity, sort, keys arguments are not supported now') op = DataFrameConcat(axis=axis, join=join, ignore_index=ignore_index, keys=keys, levels=levels, names=names, verify_integrity=verify_integrity, sort=sort, copy=copy) return op(objs)