用户指南

注解

自 0.4.1 起支持

Mars remote 接口提供了一个简单但强大的方式来并行执行一系列 Python 函数。

Mars remote 主要的接口就是 mars.remote.spawn(),它返回了一个 Mars 对象,此时还没有任何的执行发生。当 .execute 调用的时候,这个函数会被 Mars 执行,因此如果有多个函数一起执行,它们会并行运行。

>>> import mars.remote as mr
>>> def inc(x):
>>>     return x + 1
>>>
>>> result = mr.spawn(inc, args=(0,))
>>> result
Object <op=RemoteFunction, key=e0b31261d70dd9b1e00da469666d72d9>
>>> result.execute().fetch()
1

使用 mars.remote.ExecutableTuple 来执行多个函数。

>>> results = [mr.spawn(inc, args=(i,)) for i in range(10)]
>>> mr.ExecutableTuple(results).execute().fetch()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

mars.remote.spawn() 返回的 Mars 对象能作为其他函数的参数。

>>> results = [mr.spawn(inc, args=(i,)) for i in range(10)]   # list of spawned functions
>>> def sum_all(xs):
        return sum(xs)
>>> mr.spawn(sum_all, args=(results,)).execute().fetch()
55

Mars 会保证只有前10个 inc 执行完毕,才会去执行 sum_all 函数。用户不需要担心数据依赖,比如当 sum_call 执行的时候,xs 参数会被前序 inc 函数的真实输出给替代。

如果在分布式环境中执行,10个 inc 函数可能会被分配到不同的 worker 上。用户不需要关心这些函数是怎么分布的,亦不需要关心这些函数的输出如何在 worker 间传输。

用户可以在被 spawn 的函数里 spawn 新的函数。

>>> def driver():
>>>     results = [mr.spawn(inc, args=(i,)) for i in range(10)]
>>>     return mr.ExecutableTuple(results).execute().fetch()
>>>
>>> mr.spawn(driver).execute().fetch()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Spawn 的函数里也可以使用 Mars tensor、DataFrame 等等。

>>> import mars.tensor as mt
>>> def driver2():
>>>     t = mt.random.rand(10, 10)
>>>     return t.sum().to_numpy()
>>>
>>> mr.spawn(driver2).execute().fetch()
52.47844223908132

n_output 参数可以指定一个函数返回值的个数,当这些返回值被不同的函数使用时会很有用。

>>> def triage(alist):
>>>     ret = [], []
>>>     for i in alist:
>>>         if i < 0.5:
>>>             ret[0].append(i)
>>>         else:
>>>             ret[1].append(i)
>>>     return ret
>>>
>>> def sum_all(xs):
>>>     return sum(xs)
>>>
>>> l = [0.4, 0.7, 0.2, 0.8]
>>> la, lb = mr.spawn(triage, args=(l,), n_output=2)
>>>
>>> sa = mr.spawn(sum_all, args=(la,))
>>> sb = mr.spawn(sum_all, args=(lb,))
>>> mr.ExecutableTuple([sa, sb]).execute().fetch()
>>> [0.6000000000000001, 1.5]