Run on YARN#

Mars can be deployed on YARN clusters. You can use mars.deploy.yarn to start Mars clusters in Hadoop environments.

Basic steps#

Mars uses Skein to deploy itself into YARN clusters. This library bridges Java interfaces of YARN applications and Python interfaces.

Before starting Mars in YARN, you need to check your environments first. As Skein supports Linux only, you need to work on a Linux client, otherwise you need to fix and compile a number of packages yourself. Skein library is also needed on client side. You may install Skein with conda

conda install -c conda-forge skein

or install with pip

pip install skein

Then you need to check Python environment inside your cluster. If you have a Python environment installed within your YARN nodes with every required packages installed, it will save a lot of time for you to start your cluster. Otherwise you need to pack your local environment and specify it to Mars.

You may use conda-pack to pack your environment when you are using Conda:

conda activate local-env
conda install -c conda-forge conda-pack

or use venv-pack to pack your environment when you are using virtual environments:

source local-env/bin/activate
pip install venv-pack

Both commands will create a tar.gz archive, and you can use it when deploying your Mars cluster.

Then it is time to start your Mars cluster. Select different lines when you are starting from existing a conda environment, virtual environment, Python executable or pre-packed environment archive:

import os
from mars.deploy.yarn import new_cluster

# specify location of Hadoop and JDK on client side
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'
os.environ['PATH'] = '/usr/local/hadoop:' + os.environ['PATH']

# use a conda environment at /path/to/remote/conda/env
cluster = new_cluster(environment='conda:///path/to/remote/conda/env')

# use a virtual environment at /path/to/remote/virtual/env
cluster = new_cluster(environment='venv:///path/to/remote/virtual/env')

# use a remote python executable
cluster = new_cluster(environment='python:///path/to/remote/python')

# use a local packed environment archive
cluster = new_cluster(environment='path/to/local/env/pack.tar.gz')

# get web endpoint, may be used elsewhere

# new cluster will start a session and set it as default one
# execute will then run in the local cluster
a = mt.random.rand(10, 10)

# after all jobs executed, you can turn off the cluster

Customizing cluster#

new_cluster function provides several keyword arguments for users to define the cluster. You may use the argument app_name to customize the name of the Yarn application, or use the argument timeout to specify timeout of cluster creation. Arguments for scaling up and out of the cluster are also available.

Arguments for supervisors:




Number of supervisors in the cluster, 1 by default


Number of CPUs for every supervisor


Memory size for supervisors in the cluster, in bytes or size units like 1g


A dict of environment variables to set in supervisors

Arguments for workers:




Number of workers in the cluster, 1 by default


Number of CPUs for every worker, required.


Memory size for workers in the cluster, in bytes or size units like 1g, required.


List of spill paths for worker pods on hosts


Size or ratio of shared memory for every worker. Details about memory management of Mars workers can be found in memory tuning section.


Minimal number of ready workers for new_cluster to return, worker_num by default


A dict of environment variables to set in workers.

For instance, if you want to create a Mars cluster with 1 supervisor and 100 workers, each worker has 4 cores and 16GB memory, and stop waiting when 95 workers are ready, you can use the code below:

import os
from mars.deploy.yarn import new_cluster

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'

cluster = new_cluster('path/to/env/pack.tar.gz', supervisor_num=1, web_num=1,
                      worker_num=100, worker_cpu=4, worker_mem='16g',