创建和使用批量方法#

Oscar 提供了编写批量方法的一系列 API。你可以向你的 actor 方法添加一个 @extensible 注解从而创建某个方法的批量化版本。在一次批量调用中执行的所有方法将被一次性提交,减少可能的 RPC 代价。

创建批量方法#

你可以通过向现有方法添加 @extensible 装饰器创建批量方法:

import mars.oscar as mo

class ExampleActor(mo.Actor):
    @mo.extensible
    async def batch_method(self, a, b=None):
        pass

某些情况下我们需要在执行批量方法前对请求进行处理,例如按照一定的关键字对请求进行分组,再批量转发到各个特定的处理方法上。Oscar 支持为方法创建特定的批量版本。

class ExampleActor(mo.Actor):
    @mo.extensible
    async def batch_method(self, a, b=None):
        raise NotImplementedError  # this will redirect all requests to the batch version

    @batch_method.batch
    async def batch_method(self, args_list, kwargs_list):
        results = []
        for args, kwargs in zip(args_list, kwargs_list):
            a, b = self.batch_method.bind(*args, **kwargs)
            # process the request
            results.append(result)
        return results  # return a list of results

在上面的代码中,我们在原始方法抛出 NotImplementedError,这会使单次方法调用也由批量方法处理。批量方法由两个参数组成,分别代表原始方法输入的 *args**kwargs。为更容易获得批量方法的参数,Oscar 提供了名为 bind 的工具方法,该方法可从 argskwargs 中获得真正调用方法时所使用的参数。

调用批量方法#

调用批量方法很容易,可以通过 <method_name>.delay 来生成一次调用,再使用 <method_name>.batch 进行批量提交:

ref = await mo.actor_ref(uid='ExampleActor', address='127.0.0.1:13425')
results = await ref.batch_method.batch(
    ref.batch_method.delay(10, b=20),
    ref.batch_method.delay(20),
)