DASK on Mars

注解

自 0.8.0a2 起支持

Dask-on-Mars 使得用户能通过简单的 API 调用,在 Mars 中运行大部分 Dask 任务

Dask 是一个用于并行计算的 Python 库,旨在为大规模数据的分析和科学计算提供并行的计算解决方案。

注解

为了在 Mars 上运行 Dask 任务,用户不应该使用 Dask.distributed 相关特性,只需使用普通的 Dask 特性和功能

使用 Dask 调度器

Dask-on-Mars 的主要接口是 mars.contrib.dask.mars_scheduler(),它兼容了 Dask 的 scheduler 接口,这使得用户可以直接指定使用 mars_scheduler 来调度执行 Dask 任务。

>>> import dask
>>> from mars.contrib.dask import mars_scheduler
>>>
>>> def inc(x):
>>>     return x + 1
>>>
>>> dask_task = dask.delayed(inc)(1)
>>> dask_task.compute(scheduler=mars_scheduler) # Run delayed object on top of Mars
2

也可以把 Mars 的调度器设置为默认。

>>> import dask
>>> from mars.contrib.dask import mars_scheduler
>>> dask.config.set(scheduler=mars_scheduler)

将 Dask 任务转变为 Mars 任务

当用户需要使用 Mars remote API 或其他 Mars 特性来对任务进行更改时,可以使用 mars.contrib.dask.convert_dask_collection() 来将 Dask 任务转变为 Mars 任务。这一函数返回的 Mars 对象与 mars.remote.spawn() 返回的对象类型一致。

>>> import dask
>>> import mars.remote as mr
>>> from mars.contrib.dask import convert_dask_collection
>>>
>>> def inc(x):
>>>     return x + 1
>>>
>>> dask_task = dask.delayed(inc)(1)
>>> mars_obj = convert_dask_collection(dask_task) # Convert Dask object to Mars object
>>> mars_task = mr.spawn(inc, args=(mars_obj,))
>>> mars_task
Object <op=RemoteFunction, key=14a77b28d32904002829b2e8c6474b56>
>>> mars_task.execute().fetch()
3

Dask-on-Mars 是一个实验性的项目。如果您发现运行存在问题,请在Issue中报告。