在 Ray 中运行 Mars¶
Mars 与 Ray 进行了深度集成,并可以高效原生地运行在 Ray 上。
基本步骤¶
在本地安装 Ray :
pip install ray>=1.7.0
启动 Ray 集群:
import ray
ray.init()
或者使用 Ray Client 连接到一个已有的集群:
import ray
ray.init(address="ray://<head_node_host>:10001")
创建 Mars on Ray 运行时并执行计算:
import mars
import mars.tensor as mt
import mars.dataframe as md
session = mars.new_ray_session(worker_num=2, worker_mem=2 * 1024 ** 3)
mt.random.RandomState(0).rand(1000_0000, 5).sum().execute()
df = md.DataFrame(
mt.random.rand(1000_0000, 4, chunk_size=500_0000),
columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row["a"] > 0.5).show(5)
# Convert ray dataset to mars dataframe
df2 = md.read_ray_dataset(ds)
print(df2.head(5).execute())
在 Ray 集群里面独立创建 Mars on Ray运行时:
import mars
import mars.tensor as mt
cluster = mars.new_cluster_in_ray(worker_num=2, worker_mem=2 * 1024 ** 3)
连接到创建的 Mars on Ray 运行时并执行计算:
import mars
import mars.tensor as mt
session = mars.new_ray_session(address="http://ip:port", session_id="abcd", default=True)
session.execute(mt.random.RandomState(0).rand(100, 5).sum())
停止 Mars on Ray 运行时:
cluster.stop()
自定义集群¶
new_ray_session
/new_cluster_in_ray
函数提供了一些用于自定义集群的关键字参数。
Supervisor 相关参数:
参数 |
描述 |
---|---|
supervisor_mem |
Supervisor 的内存大小,单位是字节 |
Worker 相关参数:
参数 |
描述 |
---|---|
worker_num |
集群中 Worker 的数目,默认为 1 |
worker_cpu |
每个 Worker 的 CPU 数目,默认为 2 |
worker_mem |
每个 Worker 的内存大小,单位是字节,默认2G |
比如你想创建一个100个 Worker 的 Mars 集群,每个 Worker 拥有 4 核 16GB 内存,你可以使用下面的代码:
import mars
import mars.tensor as mt
cluster = mars.new_cluster_in_ray(worker_num=100, worker_cpu=4, worker_mem=16 * 1024 ** 3)