Mars 可在 YARN 集群中部署。你可以使用 mars.deploy.yarn 在 Hadoop 环境中部署 Mars 集群。
mars.deploy.yarn
Mars 使用 Skein 完成在 YARN 集群中的部署。这个包解决了 Python 代码使用 YARN 应用的 Java 接口的问题。
在 YARN 中启动 Mars 之前,你首先需要确认你的环境。由于 Skein 仅支持 Linux,你需要在一个 Linux 客户端中进行下面的操作,否则可能需要自行手动修复和编译一系列依赖包。你也需要在客户端安装 Skein 包。使用 Conda 的安装步骤为
conda install -c conda-forge skein
使用 pip 的安装步骤为
pip install skein
此后你需要确认你的 YARN 集群中安装的 Python 环境。如果集群中的 Python 已经安装了所有的依赖包,这将帮助你节约大量启动 Mars 集群的时间。否则你需要在本地打包 Python 环境并提供给 Mars。
如果你使用的是 Conda,你可以使用 conda-pack 打包你的环境:
conda activate local-env conda install -c conda-forge conda-pack conda-pack
如果你使用的是 Python 虚拟环境,你可以使用 venv-pack 打包你的 Python 虚拟环境:
source local-env/bin/activate pip install venv-pack venv-pack
这两个工具都会输出一个 tar.gz 压缩包,你可以用它来部署你的 Mars 集群。
tar.gz
此后,你可以使用下面的代码启动 Mars 集群。当你使用集群 Conda 环境、Python 虚拟环境、Python 执行文件路径或者依照上面的方法制作的 Python 环境包,可以根据代码中的注释选择相应的行:
import os from mars.deploy.yarn import new_cluster # specify location of Hadoop and JDK on client side os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk' os.environ['HADOOP_HOME'] = '/usr/local/hadoop' os.environ['PATH'] = '/usr/local/hadoop:' + os.environ['PATH'] # use a conda environment at /path/to/remote/conda/env cluster = new_cluster(enviromnent='conda:///path/to/remote/conda/env') # use a virtual environment at /path/to/remote/virtual/env cluster = new_cluster(enviromnent='venv:///path/to/remote/virtual/env') # use a remote python executable cluster = new_cluster(enviromnent='python:///path/to/remote/python') # use a local packed environment archive cluster = new_cluster(enviromnent='path/to/local/env/pack.tar.gz') # get web endpoint, may be used elsewhere from mars.session import Session print(Session.default_or_local().endpoint) # new cluster will start a session and set it as default one # execute will then run in the local cluster a = mt.random.rand(10, 10) a.dot(a.T).execute() # after all jobs executed, you can turn off the cluster cluster.stop()
new_cluster 函数提供若干可用于定制集群的参数。你可以使用 app_name 参数定义 YARN 应用的名称,timeout 参数定义创建集群的超时时间。同时,该函数还提供调整集群规模的功能。
new_cluster
app_name
timeout
Scheduler 相关参数:
参数
描述
scheduler_num
Number of schedulers in the cluster, 1 by default
每个 Scheduler 的 CPU 数目
Number of CPUs for every scheduler
scheduler_mem
每个 Scheduler 的内存大小,可使用字节数或带单位的大小,例如 1g
1g
scheduler_extra_env
在 Scheduler 中额外增加的环境变量,以 dict 表示
Worker 相关参数:
worker_num
集群中 Worker 的数目,默认为 1
worker_cpu
每个 Worker 的 CPU 数目
worker_mem
每个 Worker 的内存大小,可使用字节数或带单位的大小,例如 1g
worker_spill_paths
在宿主机上用于 Worker spill 的路径列表
worker_cache_mem
Worker 中共享内存的大小或者占比。关于 Mars Worker 中的内存管理细节可以参考 内存调优 章节。
min_worker_num
new_cluster 结束执行时所需的最小可用 Worker 个数,默认为 worker_num。
worker_extra_env
在 Worker 中额外增加的环境变量,以 dict 表示
Web 服务相关参数:
web_num
每个 Web 服务的 CPU 数目,默认为1
web_cpu
每个 Web 服务的 CPU 数目
web_mem
每个 Web 服务的内存大小,可使用字节数或带单位的大小,例如 1g
web_extra_env
在 Web 服务中额外增加的环境变量,以 dict 表示
例如,如果你需要创建一个包含单个 Scheduler、单个 Web 服务以及 100 个 Worker 的 Mars 集群,每个 Worker 拥有 4 核 16GB 内存,当 95 个 Worker 可用时认为整个集群可用,你可以使用下面的代码:
import os from mars.deploy.yarn import new_cluster os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk' os.environ['HADOOP_HOME'] = '/usr/local/hadoop' cluster = new_cluster('path/to/env/pack.tar.gz', scheduler_num=1, web_num=1, worker_num=100, worker_cpu=4, worker_mem='16g', min_worker_num=95)