Run on YARN#

Mars can be deployed on YARN clusters. You can use mars.deploy.yarn to start Mars clusters in Hadoop environments.

Basic steps#

Mars uses Skein to deploy itself into YARN clusters. This library bridges Java interfaces of YARN applications and Python interfaces.

Before starting Mars in YARN, you need to check your environments first. As Skein supports Linux only, you need to work on a Linux client, otherwise you need to fix and compile a number of packages yourself. Skein library is also needed on client side. You may install Skein with conda

conda install -c conda-forge skein

or install with pip

pip install skein

Then you need to check Python environment inside your cluster. If you have a Python environment installed within your YARN nodes with every required packages installed, it will save a lot of time for you to start your cluster. Otherwise you need to pack your local environment and specify it to Mars.

You may use conda-pack to pack your environment when you are using Conda:

conda activate local-env
conda install -c conda-forge conda-pack
conda-pack

or use venv-pack to pack your environment when you are using virtual environments:

source local-env/bin/activate
pip install venv-pack
venv-pack

Both commands will create a tar.gz archive, and you can use it when deploying your Mars cluster.

Then it is time to start your Mars cluster. Select different lines when you are starting from existing a conda environment, virtual environment, Python executable or pre-packed environment archive:

import os
from mars.deploy.yarn import new_cluster

# specify location of Hadoop and JDK on client side
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'
os.environ['PATH'] = '/usr/local/hadoop:' + os.environ['PATH']

# use a conda environment at /path/to/remote/conda/env
cluster = new_cluster(environment='conda:///path/to/remote/conda/env')

# use a virtual environment at /path/to/remote/virtual/env
cluster = new_cluster(environment='venv:///path/to/remote/virtual/env')

# use a remote python executable
cluster = new_cluster(environment='python:///path/to/remote/python')

# use a local packed environment archive
cluster = new_cluster(environment='path/to/local/env/pack.tar.gz')

# get web endpoint, may be used elsewhere
print(cluster.session.endpoint)

# new cluster will start a session and set it as default one
# execute will then run in the local cluster
a = mt.random.rand(10, 10)
a.dot(a.T).execute()

# after all jobs executed, you can turn off the cluster
cluster.stop()

Customizing cluster#

new_cluster function provides several keyword arguments for users to define the cluster. You may use the argument app_name to customize the name of the Yarn application, or use the argument timeout to specify timeout of cluster creation. Arguments for scaling up and out of the cluster are also available.

Arguments for supervisors:

Argument

Description

supervisor_num

Number of supervisors in the cluster, 1 by default

supervisor_cpu

Number of CPUs for every supervisor

supervisor_mem

Memory size for supervisors in the cluster, in bytes or size units like 1g

supervisor_extra_env

A dict of environment variables to set in supervisors

Arguments for workers:

Argument

Description

worker_num

Number of workers in the cluster, 1 by default

worker_cpu

Number of CPUs for every worker, required.

worker_mem

Memory size for workers in the cluster, in bytes or size units like 1g, required.

worker_spill_paths

List of spill paths for worker pods on hosts

worker_cache_mem

Size or ratio of shared memory for every worker. Details about memory management of Mars workers can be found in memory tuning section.

min_worker_num

Minimal number of ready workers for new_cluster to return, worker_num by default

worker_extra_env

A dict of environment variables to set in workers.

For instance, if you want to create a Mars cluster with 1 supervisor and 100 workers, each worker has 4 cores and 16GB memory, and stop waiting when 95 workers are ready, you can use the code below:

import os
from mars.deploy.yarn import new_cluster

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'

cluster = new_cluster('path/to/env/pack.tar.gz', supervisor_num=1, web_num=1,
                      worker_num=100, worker_cpu=4, worker_mem='16g',
                      min_worker_num=95)