DASK on Mars¶
Note
New in version 0.8.0a2
Dask-on-Mars provides a simple way to execute the entire Dask ecosystem on top of Mars.
Dask is a flexible library for parallel computing in Python, geared towards scaling analytics and scientific computing workloads. It provides big data collections and Dynamic task scheduling optimized for computation.
Note
For execution on Mars, you should not use the Dask.distributed client, simply use plain Dask collections and functionalities.
Scheduler¶
The main API for Dask-on-Mars is mars.contrib.dask.mars_scheduler()
. It
uses Dask’s scheduler API, which allows you to specify any callable as the
scheduler that you would like Dask to use to execute your workload.
>>> import dask
>>> from mars.contrib.dask import mars_scheduler
>>>
>>> def inc(x):
>>> return x + 1
>>>
>>> dask_task = dask.delayed(inc)(1)
>>> dask_task.compute(scheduler=mars_scheduler) # Run delayed object on top of Mars
2
You can also set Mars scheduler as default one.
>>> import dask
>>> from mars.contrib.dask import mars_scheduler
>>> dask.config.set(scheduler=mars_scheduler)
Convert Dask Collections¶
mars.contrib.dask.convert_dask_collection()
can be used when user needs to
manipulate dask collections with Mars remote API or other
features. It converts dask collections like delayed or dask-dataframe to Mars Objects,
which can be considered as results returned by mars.remote.spawn()
.
>>> import dask
>>> import mars.remote as mr
>>> from mars.contrib.dask import convert_dask_collection
>>>
>>> def inc(x):
>>> return x + 1
>>>
>>> dask_task = dask.delayed(inc)(1)
>>> mars_obj = convert_dask_collection(dask_task) # Convert Dask object to Mars object
>>> mars_task = mr.spawn(inc, args=(mars_obj,))
>>> mars_task
Object <op=RemoteFunction, key=14a77b28d32904002829b2e8c6474b56>
>>> mars_task.execute().fetch()
3
Dask-on-Mars is an ongoing project. Please open an issue if you find that one of these dask functionalities doesn’t run on Mars.