Run on YARN#
Mars can be deployed on YARN
clusters. You can use mars.deploy.yarn
to start Mars clusters in Hadoop
environments.
Basic steps#
Mars uses Skein to deploy itself into YARN clusters. This library bridges Java interfaces of YARN applications and Python interfaces.
Before starting Mars in YARN, you need to check your environments first. As Skein supports Linux only, you need to work on a Linux client, otherwise you need to fix and compile a number of packages yourself. Skein library is also needed on client side. You may install Skein with conda
conda install -c conda-forge skein
or install with pip
pip install skein
Then you need to check Python environment inside your cluster. If you have a Python environment installed within your YARN nodes with every required packages installed, it will save a lot of time for you to start your cluster. Otherwise you need to pack your local environment and specify it to Mars.
You may use conda-pack to pack your environment when you are using Conda:
conda activate local-env
conda install -c conda-forge conda-pack
conda-pack
or use venv-pack to pack your environment when you are using virtual environments:
source local-env/bin/activate
pip install venv-pack
venv-pack
Both commands will create a tar.gz
archive, and you can use it when
deploying your Mars cluster.
Then it is time to start your Mars cluster. Select different lines when you are starting from existing a conda environment, virtual environment, Python executable or pre-packed environment archive:
import os
from mars.deploy.yarn import new_cluster
# specify location of Hadoop and JDK on client side
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'
os.environ['PATH'] = '/usr/local/hadoop:' + os.environ['PATH']
# use a conda environment at /path/to/remote/conda/env
cluster = new_cluster(environment='conda:///path/to/remote/conda/env')
# use a virtual environment at /path/to/remote/virtual/env
cluster = new_cluster(environment='venv:///path/to/remote/virtual/env')
# use a remote python executable
cluster = new_cluster(environment='python:///path/to/remote/python')
# use a local packed environment archive
cluster = new_cluster(environment='path/to/local/env/pack.tar.gz')
# get web endpoint, may be used elsewhere
print(cluster.session.endpoint)
# new cluster will start a session and set it as default one
# execute will then run in the local cluster
a = mt.random.rand(10, 10)
a.dot(a.T).execute()
# after all jobs executed, you can turn off the cluster
cluster.stop()
Customizing cluster#
new_cluster
function provides several keyword arguments for users to define
the cluster. You may use the argument app_name
to customize the name of the
Yarn application, or use the argument timeout
to specify timeout of cluster
creation. Arguments for scaling up and out of the cluster are also available.
Arguments for supervisors:
Argument |
Description |
---|---|
supervisor_num |
Number of supervisors in the cluster, 1 by default |
supervisor_cpu |
Number of CPUs for every supervisor |
supervisor_mem |
Memory size for supervisors in the cluster, in bytes
or size units like |
supervisor_extra_env |
A dict of environment variables to set in supervisors |
Arguments for workers:
Argument |
Description |
---|---|
worker_num |
Number of workers in the cluster, 1 by default |
worker_cpu |
Number of CPUs for every worker, required. |
worker_mem |
Memory size for workers in the cluster, in bytes or size units
like |
worker_spill_paths |
List of spill paths for worker pods on hosts |
worker_cache_mem |
Size or ratio of shared memory for every worker. Details about memory management of Mars workers can be found in memory tuning section. |
min_worker_num |
Minimal number of ready workers for |
worker_extra_env |
A dict of environment variables to set in workers. |
For instance, if you want to create a Mars cluster with 1 supervisor and 100 workers, each worker has 4 cores and 16GB memory, and stop waiting when 95 workers are ready, you can use the code below:
import os
from mars.deploy.yarn import new_cluster
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'
cluster = new_cluster('path/to/env/pack.tar.gz', supervisor_num=1, web_num=1,
worker_num=100, worker_cpu=4, worker_mem='16g',
min_worker_num=95)