mars.remote.spawn#

mars.remote.spawn(func, args=(), kwargs=None, retry_when_fail=False, resolve_tileable_input=False, n_output=None, **kw)[source]#

Spawn a function and return a Mars Object which can be executed later.

Parameters
  • func (function) – Function to spawn.

  • args (tuple) – Args to pass to function

  • kwargs (dict) – Kwargs to pass to function

  • retry_when_fail (bool, default False) – If True, retry when function failed.

  • resolve_tileable_input (bool default False) – If True, resolve tileable inputs as values.

  • n_output (int) – Count of outputs for the function

Returns

Mars Object.

Return type

Object

Examples

>>> import mars.remote as mr
>>> def inc(x):
>>>     return x + 1
>>>
>>> result = mr.spawn(inc, args=(0,))
>>> result
Object <op=RemoteFunction, key=e0b31261d70dd9b1e00da469666d72d9>
>>> result.execute().fetch()
1

List of spawned functions can be converted to mars.remote.ExecutableTuple, and .execute() can be called to run together.

>>> results = [mr.spawn(inc, args=(i,)) for i in range(10)]
>>> mr.ExecutableTuple(results).execute().fetch()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Mars Object returned by mars.remote.spawn() can be treated as arguments for other spawn functions.

>>> results = [mr.spawn(inc, args=(i,)) for i in range(10)]   # list of spawned functions
>>> def sum_all(xs):
        return sum(xs)
>>> mr.spawn(sum_all, args=(results,)).execute().fetch()
55

inside a spawned function, new functions can be spawned.

>>> def driver():
>>>     results = [mr.spawn(inc, args=(i,)) for i in range(10)]
>>>     return mr.ExecutableTuple(results).execute().fetch()
>>>
>>> mr.spawn(driver).execute().fetch()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Mars tensor, DataFrame and so forth is available in spawned functions as well.

>>> import mars.tensor as mt
>>> def driver2():
>>>     t = mt.random.rand(10, 10)
>>>     return t.sum().to_numpy()
>>>
>>> mr.spawn(driver2).execute().fetch()
52.47844223908132

Argument of n_output can indicate that the spawned function will return multiple outputs. This is important when some of the outputs may be passed to different functions.

>>> def triage(alist):
>>>     ret = [], []
>>>     for i in alist:
>>>         if i < 0.5:
>>>             ret[0].append(i)
>>>         else:
>>>             ret[1].append(i)
>>>     return ret
>>>
>>> def sum_all(xs):
>>>     return sum(xs)
>>>
>>> l = [0.4, 0.7, 0.2, 0.8]
>>> la, lb = mr.spawn(triage, args=(l,), n_output=2)
>>>
>>> sa = mr.spawn(sum_all, args=(la,))
>>> sb = mr.spawn(sum_all, args=(lb,))
>>> mr.ExecutableTuple([sa, sb]).execute().fetch()
>>> [0.6000000000000001, 1.5]