Source code for mars.learn.contrib.tensorflow.dataset

# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import copy
from typing import List, Tuple

import pandas as pd
import numpy as np

from .... import execute
from ....core.context import get_context
from ....tensor.core import TENSOR_TYPE
from ....dataframe.core import DATAFRAME_TYPE, SERIES_TYPE
from ....utils import require_not_none, lazy_import

tf = lazy_import("tensorflow")


ACCEPT_TYPE = (
    TENSOR_TYPE,
    DATAFRAME_TYPE,
    SERIES_TYPE,
    np.ndarray,
    pd.DataFrame,
    pd.Series,
    List,
)


@require_not_none(tf)
class MarsDataset:
    def __init__(
        self,
        tensors,
        output_shapes: Tuple[int, ...] = None,
        output_types: Tuple[np.dtype, ...] = None,
        fetch_kwargs=None,
    ):
        self._context = get_context()
        self._tensors = tensors
        self._output_shapes = output_shapes
        self._output_types = output_types
        self._fetch_kwargs = fetch_kwargs or dict()
        self._executed = False
        self._check_and_convert()

    def _check_and_convert(self):
        if not isinstance(self._tensors, Tuple):
            self._tensors = (self._tensors,)
        for t in self._tensors:
            if not isinstance(t, ACCEPT_TYPE):
                raise TypeError(f"Unexpected dataset type: {type(t)}")

        if not self._executed:
            self._execute()
            self._executed = True

        if not self._output_shapes:
            get_shape = (
                lambda t: tuple(())
                if isinstance(t, (List, SERIES_TYPE, pd.Series))
                else t.shape[1:]
            )
            self._output_shapes = (
                get_shape(self._tensors[0])
                if len(self._tensors) == 1
                else tuple(get_shape(t) for t in self._tensors)
            )

        if not self._output_types:
            get_type = (
                lambda t: type(t[0])
                if isinstance(t, List)
                else t[0].dtype
                if isinstance(t, (DATAFRAME_TYPE, pd.DataFrame))
                else t.dtype
            )
            self._output_types = (
                get_type(self._tensors[0])
                if len(self._tensors) == 1
                else tuple(tf.as_dtype(get_type(t)) for t in self._tensors)
            )

    def _execute(self):  # pragma: no cover
        execute_data = [t for t in self._tensors if isinstance(t, ACCEPT_TYPE[:3])]

        if len(execute_data) > 0:
            execute(execute_data)

    def get_data(self, t, index):  # pragma: no cover
        # coverage not included as now there is no solution to cover tensorflow methods
        # see https://github.com/tensorflow/tensorflow/issues/33759 for more details.
        fetch_kwargs = dict()
        if self._fetch_kwargs:
            fetch_kwargs = copy.deepcopy(self._fetch_kwargs)

        if isinstance(t, TENSOR_TYPE):
            return t[index].fetch(**fetch_kwargs)
        elif isinstance(t, np.ndarray):
            return t[index]
        elif isinstance(t, DATAFRAME_TYPE):
            return t.iloc[index].fetch(**fetch_kwargs).values
        elif isinstance(t, SERIES_TYPE):
            return t.iloc[index].fetch(**fetch_kwargs)
        elif isinstance(t, pd.DataFrame):
            return t.iloc[index].values
        elif isinstance(t, pd.Series):
            return t.iloc[index]
        else:
            return t[index]

    def to_tf(self) -> "tf.data.Dataset":
        """Get TF Dataset.

        convert into a tensorflow.data.Dataset
        """

        def make_generator():  # pragma: no cover
            if not self._executed:
                self._execute()
                self._executed = True

            for i in range(len(self._tensors[0])):
                if len(self._tensors) == 1:
                    yield self.get_data(self._tensors[0], i)
                else:
                    yield tuple(self.get_data(t, i) for t in self._tensors)

        return tf.data.Dataset.from_generator(
            make_generator,
            output_types=self._output_types,
            output_shapes=self._output_shapes,
        )


[docs]def gen_tensorflow_dataset( tensors, output_shapes: Tuple[int, ...] = None, output_types: Tuple[np.dtype, ...] = None, fetch_kwargs=None, ): """ convert mars data type to tf.data.Dataset. Note this is based tensorflow 2.0 For example ----------- >>> # convert a tensor to tf.data.Dataset. >>> data = mt.tensor([[1, 2], [3, 4]]) >>> dataset = gen_tensorflow_dataset(data) >>> list(dataset.as_numpy_iterator()) [array([1, 2]), array([3, 4])] >>> dataset.element_spec TensorSpec(shape=(2,), dtype=tf.int64, name=None) >>> # convert a tuple of tensors to tf.data.Dataset. >>> data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6]) >>> dataset = gen_tensorflow_dataset((data1, data2, data3)) >>> list(dataset.as_numpy_iterator()) [(1, 3, 5), (2, 4, 6)] Parameters ---------- tensors: Mars data type or a tuple consisting of Mars data type the data that convert to tf.data.dataset output_shapes: A (nested) structure of `tf.TensorShape` objects corresponding to each component of an element yielded from mars object. output_types: A (nested) structure of `tf.DType` objects corresponding to each component of an element yielded from mars object. fetch_kwargs: the parameters of mars object executes fetch() operation. Returns ------- tf.data.Dataset """ mars_dataset = MarsDataset( tensors, output_shapes=output_shapes, output_types=output_types, fetch_kwargs=fetch_kwargs, ) return mars_dataset.to_tf()