和 TensorFlow 集成

这篇指引会介绍如何在 Mars 里集成 TensorFlow。

本指引基于 TensorFlow 2.0。


如果你尝试在单机比如你的笔记本上使用 Mars,确保 TensorFlow 已经安装。

通过 pip 安装 TensorFlow:

pip install tensorflow

访问 TensorFlow 安装指引 获取更多信息

另一方面,如果你打算在集群中使用 Mars,确保 TensorFlow 在每个 worker 上安装。


这里我们使用 ionosphere 数据集,点击链接下载数据。

编写 TensorFlow 脚本

现在我们创建一个命名为 tf_demo.py 的 Python 文件,它包含了 TensorFlow 的处理逻辑。

import os

import mars.dataframe as md
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense

def prepare_data():
    df = md.read_csv('ionosphere.data', header=None)

    # split into input and output columns
    X = df.iloc[:, :-1].to_tensor().astype('float32')
    y = df.iloc[:, -1].to_tensor()

    # convert Mars tensor to numpy ndarray
    X, y = X.to_numpy(), y.to_numpy()

    # encode string to integer
    y = LabelEncoder().fit_transform(y)

    # split into train and test datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
    print(X_train.size, X_test.size, y_train.size, y_test.size)

    return X_train, X_test, y_train, y_test

def get_model(n_features):
    model = Sequential()
    model.add(Dense(10, activation='relu', kernel_initializer='he_normal',
    model.add(Dense(8, activation='relu', kernel_initializer='he_normal'))
    model.add(Dense(1, activation='sigmoid'))

    # compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

def train():
    X_train, X_test, y_train, y_test = prepare_data()

    model = get_model(X_train.shape[1])

    # fit model
    model.fit(X_train, y_train, epochs=150, batch_size=32, verbose=0)
    # evaluate
    loss, acc = model.evaluate(X_test, y_test, verbose=0)
    print('Test accuracy: %.3f' % acc)

if __name__ == '__main__':
    if 'TF_CONFIG' in os.environ:
        # distributed TensorFlow
        multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

        with multiworker_strategy.scope():

Mars DataFrame 等模块可以直接在脚本里使用,以处理大规模数据和加速预处理。

通过 Mars 运行 TensorFlow 脚本

现在可以通过 run_tensorflow_script() 提交 TensorFlow 脚本。

In [1]: from mars.learn.contrib.tensorflow import run_tensorflow_script

In [2]: run_tensorflow_script('tf_demo.py', n_workers=1)
2020-04-28 15:40:38.284763: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
sh: sysctl: command not found
2020-04-28 15:40:38.301635: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7fd29699c020 executing computations on platform Host. Devices:
2020-04-28 15:40:38.301656: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): Host, Default Version
2020-04-28 15:40:38.303779: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2221}
2020-04-28 15:40:38.304476: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:365] Started server with target: grpc://localhost:2221
7990 3944 235 116
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an "evaluator" task exists in the cluster.
WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.
WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.
Test accuracy: 0.931
2020-04-28 15:40:45.906407: W tensorflow/core/common_runtime/eager/context.cc:290] Unable to destroy server_ object, so releasing instead. Servers don't support clean shutdown.
Out[2]: {'status': 'ok'}


部署参考 在集群中部署 部分,在 Kubernetes 上运行参考 在 Kubernetes 中部署 部分。

你能从 tf_demo.py 文件中看出,Mars 会自动设置 TF_CONFIG 环境变量。TF_CONFIG 环境变量包含了集群的信息。你需要做的只是选择一个正确的 分布式策略

一旦一个集群存在,你可以要么设置默认 session,训练和预测就会自动提交到集群,要么你可以通过 session=*** 显示指定运行的 session。

# A cluster has been configured, and web UI is started on <web_ip>:<web_port>
from mars.session import new_session
# set the session as the default one
sess = new_session('http://<web_ip>:<web_port>').as_default()

# submitted to cluster by default
run_tensorflow_script('tf_demo.py', n_workers=1)

# Or, session could be specified as well
run_tensorflow_script('tf_demo.py', n_workers=1, session=sess)