创建和使用批量方法#
Oscar 提供了编写批量方法的一系列 API。你可以向你的 actor 方法添加一个 @extensible
注解从而创建某个方法的批量化版本。在一次批量调用中执行的所有方法将被一次性提交,减少可能的 RPC 代价。
创建批量方法#
你可以通过向现有方法添加 @extensible
装饰器创建批量方法:
import mars.oscar as mo
class ExampleActor(mo.Actor):
@mo.extensible
async def batch_method(self, a, b=None):
pass
某些情况下我们需要在执行批量方法前对请求进行处理,例如按照一定的关键字对请求进行分组,再批量转发到各个特定的处理方法上。Oscar 支持为方法创建特定的批量版本。
class ExampleActor(mo.Actor):
@mo.extensible
async def batch_method(self, a, b=None):
raise NotImplementedError # this will redirect all requests to the batch version
@batch_method.batch
async def batch_method(self, args_list, kwargs_list):
results = []
for args, kwargs in zip(args_list, kwargs_list):
a, b = self.batch_method.bind(*args, **kwargs)
# process the request
results.append(result)
return results # return a list of results
在上面的代码中,我们在原始方法抛出 NotImplementedError
,这会使单次方法调用也由批量方法处理。批量方法由两个参数组成,分别代表原始方法输入的 *args
和 **kwargs
。为更容易获得批量方法的参数,Oscar 提供了名为 bind
的工具方法,该方法可从 args
和 kwargs
中获得真正调用方法时所使用的参数。
调用批量方法#
调用批量方法很容易,可以通过 <method_name>.delay
来生成一次调用,再使用 <method_name>.batch
进行批量提交:
ref = await mo.actor_ref(uid='ExampleActor', address='127.0.0.1:13425')
results = await ref.batch_method.batch(
ref.batch_method.delay(10, b=20),
ref.batch_method.delay(20),
)