Integrate with TensorFlow#

This introduction will give a brief tour about how to integrate TensorFlow in Mars.

This tutorial is based on TensorFlow 2.0.


If you are trying to use Mars on a single machine, e.g. on your laptop, make sure TensorFlow is installed.

You can install TensorFlow via pip:

pip install tensorflow

Visit installation guide for TensorFlow for more information.

On the other hand, if you are about to use Mars on a cluster, maker sure TensorFlow is installed on each worker.

Prepare data#

The dataset here we used is ionosphere dataset, click link to download data.

Prepare TensorFlow script#

Now we create a Python file called which contains the logic of TensorFlow.

import os

import mars.dataframe as md
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense

def prepare_data():
    df = md.read_csv('', header=None)

    # split into input and output columns
    X = df.iloc[:, :-1].to_tensor().astype('float32')
    y = df.iloc[:, -1].to_tensor()

    # convert Mars tensor to numpy ndarray
    X, y = X.to_numpy(), y.to_numpy()

    # encode string to integer
    y = LabelEncoder().fit_transform(y)

    # split into train and test datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
    print(X_train.size, X_test.size, y_train.size, y_test.size)

    return X_train, X_test, y_train, y_test

def get_model(n_features):
    model = Sequential()
    model.add(Dense(10, activation='relu', kernel_initializer='he_normal',
    model.add(Dense(8, activation='relu', kernel_initializer='he_normal'))
    model.add(Dense(1, activation='sigmoid'))

    # compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

def train():
    X_train, X_test, y_train, y_test = prepare_data()

    model = get_model(X_train.shape[1])

    # fit model, y_train, epochs=150, batch_size=32, verbose=0)
    # evaluate
    loss, acc = model.evaluate(X_test, y_test, verbose=0)
    print('Test accuracy: %.3f' % acc)

if __name__ == '__main__':
    if 'TF_CONFIG' in os.environ:
        # distributed TensorFlow
        multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

        with multiworker_strategy.scope():

Mars libraries including DataFrame and so forth could be used directly to process massive data and accelerate preprocess.

Run TensorFlow script via Mars#

The TensorFlow script can be submitted via run_tensorflow_script() now.

In [1]: from mars.learn.contrib.tensorflow import run_tensorflow_script

In [2]: run_tensorflow_script('', n_workers=1)
2020-04-28 15:40:38.284763: I tensorflow/core/platform/] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
sh: sysctl: command not found
2020-04-28 15:40:38.301635: I tensorflow/compiler/xla/service/] XLA service 0x7fd29699c020 executing computations on platform Host. Devices:
2020-04-28 15:40:38.301656: I tensorflow/compiler/xla/service/]   StreamExecutor device (0): Host, Default Version
2020-04-28 15:40:38.303779: I tensorflow/core/distributed_runtime/rpc/] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2221}
2020-04-28 15:40:38.304476: I tensorflow/core/distributed_runtime/rpc/] Started server with target: grpc://localhost:2221
7990 3944 235 116
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
Test accuracy: 0.931
2020-04-28 15:40:45.906407: W tensorflow/core/common_runtime/eager/] Unable to destroy server_ object, so releasing instead. Servers don't support clean shutdown.
Out[2]: {'status': 'ok'}

Distributed training or inference#

Refer to Run on Clusters section for deployment, or Run on Kubernetes section for running Mars on Kubernetes.

As you can tell from, Mars will set environment variable TF_CONFIG automatically. TF_CONFIG contains cluster and task information. Thus you don’t need to worry about the distributed setting, what you need do is to choose a proper distributed strategy.

Once a cluster exists, you can either set the session as default, the training and prediction shown above will be submitted to the cluster, or you can specify session=*** explicitly as well.

# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
import mars
# set the session as the default one
sess = mars.new_session('http://<web_ip>:<web_port>')

# submitted to cluster by default
run_tensorflow_script('', n_workers=1)

# Or, session could be specified as well
run_tensorflow_script('', n_workers=1, session=sess)

Use gen_tensorflow_dataset#

You can convert Mars data(mars.tensor.Tensor, mars.dataframe.DataFrame, mars.dataframe.Series) to by gen_tensorflow_dataset(). It also support numpy.ndarray, pandas.DataFrame, pandas.Series.

In [1]: data = mt.tensor([[1, 2], [3, 4]])
In [2]: dataset = gen_tensorflow_dataset(data)
In [3]: list(dataset.as_numpy_iterator())
Out[3]: [array([1, 2]), array([3, 4])]

In [1]: data1 = mt.tensor([1, 2]); data2 = mt.tensor([3, 4]); data3 = mt.tensor([5, 6])
In [2]: dataset = gen_tensorflow_dataset((data1, data2, data3))
In [3]: list(dataset.as_numpy_iterator())
Out[3]: [(1, 3, 5), (2, 4, 6)]

Now, you can preprocess the data via mars, and pass data to script.

import mars.dataframe as md
from sklearn.preprocessing import LabelEncoder
from mars.learn.contrib.tensorflow import run_tensorflow_script

df = md.read_csv('', header=None)
X = df.iloc[:, :-1].astype('float32')
y = df.iloc[:, -1]
y = LabelEncoder().fit_transform(y.execute().fetch())
X_train, X_test, y_train, y_test = train_test_split(X.execute(), y, test_size=0.33)

    "", n_workers=2, data={'X_train': X_train, 'y_train': y_train,
    'X_test':X_test, 'y_test': y_test}, session=sess)

import os

from mars.learn.contrib.tensorflow import gen_tensorflow_dataset
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense

def get_model(n_features):
    model = Sequential()
    model.add(Dense(10, activation='relu', kernel_initializer='he_normal',
    model.add(Dense(8, activation='relu', kernel_initializer='he_normal'))
    model.add(Dense(1, activation='sigmoid'))

    # compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

def train(X_train, X_test, y_train, y_test):
    model = get_model(X_train.shape[1])

    db_train = gen_tensorflow_dataset((X_train, y_train))
    db_train = db_train.batch(32)
    db_test = gen_tensorflow_dataset((X_test, y_test))
    db_test = db_test.batch(32)

    # fit model, epochs=150)
    # evaluate
    loss, acc = model.evaluate(db_test)
    print('Test accuracy: %.3f' % acc)

if __name__ == '__main__':
    X_train = globals()['X_train']
    y_train = globals()['y_train']
    X_test = globals()['X_test']
    y_test = globals()['y_test']

    if 'TF_CONFIG' in os.environ:
        # distributed TensorFlow
        multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

        with multiworker_strategy.scope():
            train(X_train, X_test, y_train, y_test)
        train(X_train, X_test, y_train, y_test)


Epoch 1/150
Epoch 1/150
      1/Unknown - 1s 996ms/step - loss: 0.7825 - accuracy: 0.2500      1/Unknown - 1s 996ms/step - loss: 0.7825 - accura
      6/Unknown - 3s 362ms/step - loss: 0.7388 - accuracy: 0.3438      6/Unknown - 3s 363ms/step - loss: 0.7388 - accura
      7/Unknown - 3s 358ms/step - loss: 0.7404 - accuracy: 0.3259      7/Unknown - 3s 358ms/step - loss: 0.7404 - accura
      8/Unknown - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277      8/Unknown - 3s 324ms/step - loss: 0.7368 - accura
8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277
8/8 [==============================] - 3s 324ms/step - loss: 0.7368 - accuracy: 0.3277
Epoch 2/150
Epoch 2/150
8/8 [==============================] - ETA: 0s - loss: 0.6775 - accuracy: 0.49798/8 [==============================] - E
8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979
8/8 [==============================] - 3s 314ms/step - loss: 0.6775 - accuracy: 0.4979
Epoch 3/150
Epoch 3/150
Epoch 150/150
Epoch 150/150
2/8 [======>.......................] - ETA: 2s - loss: 0.0210 - accuracy: 1.00002/8 [======>.......................] - E
3/8 [==========>...................] - ETA: 1s - loss: 0.0220 - accuracy: 1.00003/8 [==========>...................] - E
8/8 [==============================] - ETA: 0s - loss: 0.0319 - accuracy: 0.99578/8 [==============================] - E
8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957
8/8 [==============================] - 3s 351ms/step - loss: 0.0319 - accuracy: 0.9957

. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do
 this by creating a new `` object then setting `options.experimental_distribute.auto_shard_policy = Aut
 oShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
       4/Unknown - 3s 380ms/step - loss: 0.2354 - accuracy: 0.9138      4/Unknown - 3s 380ms/step - loss: 0.2354 - accura
4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138
4/4 [==============================] - 3s 381ms/step - loss: 0.2354 - accuracy: 0.9138
Test accuracy: 0.914
Test accuracy: 0.914