Integrate with TensorFlow

This introduction will give a brief tour about how to integrate TensorFlow in Mars.

This tutorial is based on TensorFlow 2.0.

Installation

If you are trying to use Mars on a single machine, e.g. on your laptop, make sure TensorFlow is installed.

You can install TensorFlow via pip:

pip install tensorflow

Visit installation guide for TensorFlow for more information.

On the other hand, if you are about to use Mars on a cluster, maker sure TensorFlow is installed on each worker.

Prepare data

The dataset here we used is ionosphere dataset, click link to download data.

Prepare TensorFlow script

Now we create a Python file called tf_demo.py which contains the logic of TensorFlow.

import os

import mars.dataframe as md
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense


def prepare_data():
    df = md.read_csv('ionosphere.data', header=None)

    # split into input and output columns
    X = df.iloc[:, :-1].to_tensor().astype('float32')
    y = df.iloc[:, -1].to_tensor()

    # convert Mars tensor to numpy ndarray
    X, y = X.to_numpy(), y.to_numpy()

    # encode string to integer
    y = LabelEncoder().fit_transform(y)

    # split into train and test datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
    print(X_train.size, X_test.size, y_train.size, y_test.size)

    return X_train, X_test, y_train, y_test


def get_model(n_features):
    model = Sequential()
    model.add(Dense(10, activation='relu', kernel_initializer='he_normal',
                    input_shape=(n_features,)))
    model.add(Dense(8, activation='relu', kernel_initializer='he_normal'))
    model.add(Dense(1, activation='sigmoid'))

    # compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model


def train():
    X_train, X_test, y_train, y_test = prepare_data()

    model = get_model(X_train.shape[1])

    # fit model
    model.fit(X_train, y_train, epochs=150, batch_size=32, verbose=0)
    # evaluate
    loss, acc = model.evaluate(X_test, y_test, verbose=0)
    print('Test accuracy: %.3f' % acc)


if __name__ == '__main__':
    if 'TF_CONFIG' in os.environ:
        # distributed TensorFlow
        multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

        with multiworker_strategy.scope():
            train()
    else:
        train()

Mars libraries including DataFrame and so forth could be used directly to process massive data and accelerate preprocess.

Run TensorFlow script via Mars

The TensorFlow script can be submitted via run_tensorflow_script() now.

In [1]: from mars.learn.contrib.tensorflow import run_tensorflow_script

In [2]: run_tensorflow_script('tf_demo.py', n_workers=1)
2020-04-28 15:40:38.284763: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
sh: sysctl: command not found
2020-04-28 15:40:38.301635: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7fd29699c020 executing computations on platform Host. Devices:
2020-04-28 15:40:38.301656: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): Host, Default Version
2020-04-28 15:40:38.303779: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2221}
2020-04-28 15:40:38.304476: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:365] Started server with target: grpc://localhost:2221
7990 3944 235 116
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
Test accuracy: 0.931
2020-04-28 15:40:45.906407: W tensorflow/core/common_runtime/eager/context.cc:290] Unable to destroy server_ object, so releasing instead. Servers don't support clean shutdown.
Out[2]: {'status': 'ok'}

Distributed training or inference

Refer to Run on Clusters section for deployment, or Run on Kubernetes section for running Mars on Kubernetes.

As you can tell from tf_demo.py, Mars will set environment variable TF_CONFIG automatically. TF_CONFIG contains cluster and task information. Thus you don’t need to worry about the distributed setting, what you need do is to choose a proper distributed strategy.

Once a cluster exists, you can either set the session as default, the training and prediction shown above will be submitted to the cluster, or you can specify session=*** explicitly as well.

# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
import mars
# set the session as the default one
sess = mars.new_session('http://<web_ip>:<web_port>')

# submitted to cluster by default
run_tensorflow_script('tf_demo.py', n_workers=1)

# Or, session could be specified as well
run_tensorflow_script('tf_demo.py', n_workers=1, session=sess)

Use gen_tensorflow_dataset

You can convert Mars data(mars.tensor.Tensor, mars.dataframe.DataFrame, mars.dataframe.Series) to tf.data.Dataset by gen_tensorflow_dataset(). It also support numpy.ndarray, pandas.DataFrame, pandas.Series.

In [1]: data = mt.tensor([[1, 2], [3, 4]])
In [2]: dataset = gen_tensorflow_dataset(data)
In [3]: list(dataset.as_numpy_iterator())
Out[3]: [array([1, 2]), array([3, 4])]

In [1]: data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6])
In [2]: dataset = gen_tensorflow_dataset((data1, data2, data3))
In [3]: list(dataset.as_numpy_iterator())
Out[3]: [(1, 3, 5), (2, 4, 6)]

Now, you can preprocess the data via mars, and pass data to script.

import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder
from mars.learn.contrib.tensorflow import run_tensorflow_script


df = md.read_csv('ionosphere.data', header=None)
X = df.iloc[:, :-1].astype('float32')
y = df.iloc[:, -1]
y = LabelEncoder().fit_transform(y.execute().fetch())
X_train, X_test, y_train, y_test = train_test_split(X.execute(), y, test_size=0.33)

run_tensorflow_script(
    "tf_demo.py", n_workers=2, data={'X_train': X_train, 'y_train': y_train,
    'X_test':X_test, 'y_test': y_test}, session=sess)

tf_demo.py

import os

from mars.learn.contrib.tensorflow import gen_tensorflow_dataset
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense


def get_model(n_features):
    model = Sequential()
    model.add(Dense(10, activation='relu', kernel_initializer='he_normal',
                    input_shape=(n_features,)))
    model.add(Dense(8, activation='relu', kernel_initializer='he_normal'))
    model.add(Dense(1, activation='sigmoid'))

    # compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model


def train(X_train, X_test, y_train, y_test):
    model = get_model(X_train.shape[1])

    db_train = gen_tensorflow_dataset((X_train, y_train))
    db_train = db_train.batch(32)
    db_test = gen_tensorflow_dataset((X_test, y_test))
    db_test = db_test.batch(32)

    # fit model
    model.fit(db_train, epochs=150)
    # evaluate
    loss, acc = model.evaluate(db_test)
    print('Test accuracy: %.3f' % acc)


if __name__ == '__main__':
    X_train = globals()['X_train']
    y_train = globals()['y_train']
    X_test = globals()['X_test']
    y_test = globals()['y_test']

    if 'TF_CONFIG' in os.environ:
        # distributed TensorFlow
        multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

        with multiworker_strategy.scope():
            train(X_train, X_test, y_train, y_test)
    else:
        train(X_train, X_test, y_train, y_test)

result:

Epoch 1/150
Epoch 1/150
      1/Unknown - 1s 996ms/step - loss: 0.7825 - accuracy: 0.2500      1/Unknown - 1s 996ms/step - loss: 0.7825 - accura
      6/Unknown - 3s 362ms/step - loss: 0.7388 - accuracy: 0.3438      6/Unknown - 3s 363ms/step - loss: 0.7388 - accura
      7/Unknown - 3s 358ms/step - loss: 0.7404 - accuracy: 0.3259      7/Unknown - 3s 358ms/step - loss: 0.7404 - accura
      8/Unknown - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277      8/Unknown - 3s 324ms/step - loss: 0.7368 - accura
8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277
8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277
Epoch 2/150
Epoch 2/150
8/8 [==============================] - ETA: 0s - loss: 0.6775 - accuracy: 0.49798/8 [==============================] - E
8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979
8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979
Epoch 3/150
Epoch 3/150
...
Epoch 150/150
Epoch 150/150
2/8 [======>.......................] - ETA: 2s - loss: 0.0210 - accuracy: 1.00002/8 [======>.......................] - E
3/8 [==========>...................] - ETA: 1s - loss: 0.0220 - accuracy: 1.00003/8 [==========>...................] - E
8/8 [==============================] - ETA: 0s - loss: 0.0319 - accuracy: 0.99578/8 [==============================] - E
8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957
8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957

. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do
 this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = Aut
 oShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
       4/Unknown - 3s 380ms/step - loss: 0.2354 - accuracy: 0.9138      4/Unknown - 3s 380ms/step - loss: 0.2354 - accura
4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138
4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138
Test accuracy: 0.914
Test accuracy: 0.914