在 YARN 中部署

Mars 可在 YARN 集群中部署。你可以使用 mars.deploy.yarn 在 Hadoop 环境中部署 Mars 集群。

基本步骤

Mars 使用 Skein 完成在 YARN 集群中的部署。这个包解决了 Python 代码使用 YARN 应用的 Java 接口的问题。

在 YARN 中启动 Mars 之前,你首先需要确认你的环境。由于 Skein 仅支持 Linux,你需要在一个 Linux 客户端中进行下面的操作,否则可能需要自行手动修复和编译一系列依赖包。你也需要在客户端安装 Skein 包。使用 Conda 的安装步骤为

conda install -c conda-forge skein

使用 pip 的安装步骤为

pip install skein

此后你需要确认你的 YARN 集群中安装的 Python 环境。如果集群中的 Python 已经安装了所有的依赖包,这将帮助你节约大量启动 Mars 集群的时间。否则你需要在本地打包 Python 环境并提供给 Mars。

如果你使用的是 Conda,你可以使用 conda-pack 打包你的环境:

conda activate local-env
conda install -c conda-forge conda-pack
conda-pack

如果你使用的是 Python 虚拟环境,你可以使用 venv-pack 打包你的 Python 虚拟环境:

source local-env/bin/activate
pip install venv-pack
venv-pack

这两个工具都会输出一个 tar.gz 压缩包,你可以用它来部署你的 Mars 集群。

此后,你可以使用下面的代码启动 Mars 集群。当你使用集群 Conda 环境、Python 虚拟环境、Python 执行文件路径或者依照上面的方法制作的 Python 环境包,可以根据代码中的注释选择相应的行:

import os
from mars.deploy.yarn import new_cluster

# specify location of Hadoop and JDK on client side
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'
os.environ['PATH'] = '/usr/local/hadoop:' + os.environ['PATH']

# use a conda environment at /path/to/remote/conda/env
cluster = new_cluster(environment='conda:///path/to/remote/conda/env')

# use a virtual environment at /path/to/remote/virtual/env
cluster = new_cluster(environment='venv:///path/to/remote/virtual/env')

# use a remote python executable
cluster = new_cluster(environment='python:///path/to/remote/python')

# use a local packed environment archive
cluster = new_cluster(environment='path/to/local/env/pack.tar.gz')

# get web endpoint, may be used elsewhere
print(cluster.session.endpoint)

# new cluster will start a session and set it as default one
# execute will then run in the local cluster
a = mt.random.rand(10, 10)
a.dot(a.T).execute()

# after all jobs executed, you can turn off the cluster
cluster.stop()

定制集群

new_cluster 函数提供若干可用于定制集群的参数。你可以使用 app_name 参数定义 YARN 应用的名称,timeout 参数定义创建集群的超时时间。同时,该函数还提供调整集群规模的功能。

Supervisor 相关参数:

参数

描述

supervisor_num

每个 Supervisor 的 CPU 数目,默认为 1

supervisor_cpu

每个 Supervisor 的 CPU 数目

supervisor_mem

每个 Supervisor 的内存大小,可使用字节数或带单位的大小,例如 1g

supervisor_extra_env

在 Supervisor 中额外增加的环境变量,以 dict 表示

Worker 相关参数:

参数

描述

worker_num

集群中 Worker 的数目,默认为 1

worker_cpu

每个 Worker 的 CPU 数目,此参数为必需

worker_mem

每个 Worker 的内存大小,可使用字节数或带单位的大小,例如 1g,此参数为必需

worker_spill_paths

在宿主机上用于 Worker spill 的路径列表

worker_cache_mem

Worker 中共享内存的大小或者占比。关于 Mars Worker 中的内存管理细节可以参考 内存调优 章节。

min_worker_num

new_cluster 结束执行时所需的最小可用 Worker 个数,默认为 worker_num

worker_extra_env

在 Worker 中额外增加的环境变量,以 dict 表示

例如,如果你需要创建一个包含单个 Supervisor 和 100 个 Worker 的 Mars 集群,每个 Worker 拥有 4 核 16GB 内存,当 95 个 Worker 可用时认为整个集群可用,你可以使用下面的代码:

import os
from mars.deploy.yarn import new_cluster

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'

cluster = new_cluster('path/to/env/pack.tar.gz', supervisor_num=1, web_num=1,
                      worker_num=100, worker_cpu=4, worker_mem='16g',
                      min_worker_num=95)