在 Ray 中运行 Mars

Mars 与 Ray 进行了深度集成,并可以高效原生地运行在 Ray 上。

基本步骤

在本地安装 Ray :

pip install ray>=1.7.0

启动 Ray 集群:

import ray
ray.init()

或者使用 Ray Client 连接到一个已有的集群:

import ray
ray.init(address="ray://<head_node_host>:10001")

创建 Mars on Ray 运行时并执行计算:

import mars
import mars.tensor as mt
import mars.dataframe as md
session = mars.new_ray_session(worker_num=2, worker_mem=2 * 1024 ** 3)
mt.random.RandomState(0).rand(1000_0000, 5).sum().execute()
df = md.DataFrame(
    mt.random.rand(1000_0000, 4, chunk_size=500_0000),
    columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)
# Convert ray dataset to mars dataframe
df2 = md.read_ray_dataset(ds)
print(df2.head(5).execute())

在 Ray 集群里面独立创建 Mars on Ray运行时:

import mars
import mars.tensor as mt
cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)

连接到创建的 Mars on Ray 运行时并执行计算:

import mars
import mars.tensor as mt
session = mars.new_ray_session(address="http://ip:port", session_id="abcd", default=True)
session.execute(mt.random.RandomState(0).rand(100, 5).sum())

停止 Mars on Ray 运行时:

cluster.stop()

自定义集群

new_ray_session/new_cluster_in_ray 函数提供了一些用于自定义集群的关键字参数。

Supervisor 相关参数:

参数

描述

supervisor_mem

Supervisor 的内存大小,单位是字节

Worker 相关参数:

参数

描述

worker_num

集群中 Worker 的数目,默认为 1

worker_cpu

每个 Worker 的 CPU 数目,默认为 2

worker_mem

每个 Worker 的内存大小,单位是字节,默认2G

比如你想创建一个100个 Worker 的 Mars 集群,每个 Worker 拥有 4 核 16GB 内存,你可以使用下面的代码:

import mars
import mars.tensor as mt
cluster = mars.new_cluster_in_ray(worker_num=100, worker_cpu=4, worker_mem=16 * 1024 ** 3)