Create and Use Batch Methods#
Oscar provides a set of APIs to write batch methods. You can simply add a
@extensible
decorator to your actor method and create a batch version. All
calls wrapped in a batch will be sent together, reducing possible RPC cost.
Create a Batch Method#
You can create a batch method with a @extensible
decorator:
import mars.oscar as mo
class ExampleActor(mo.Actor):
@mo.extensible
async def batch_method(self, a, b=None):
pass
Sometimes we need to process received batch requests. For instance, we need to group requests by certain keys and resent them to different handlers in batches. Oscar supports creating a batch version of the method:
class ExampleActor(mo.Actor):
@mo.extensible
async def batch_method(self, a, b=None):
raise NotImplementedError # this will redirect all requests to the batch version
@batch_method.batch
async def batch_method(self, args_list, kwargs_list):
results = []
for args, kwargs in zip(args_list, kwargs_list):
a, b = self.batch_method.bind(*args, **kwargs)
# process the request
results.append(result)
return results # return a list of results
In the code above, we simply raises a NotImplementedError
to let the batch
version handle all requests. The batch version have two arguments accepting
args
and kwargs
of all batched calls as lists. To make argument
extraction easier, a utility function bind
is added as an attribute of the
method which extracts args
and kwargs
into real arguments.
Call Batch Methods#
Calling batch methods is easy. You can use <method_name>.delay
to make a
batched call and use <method_name>.batch
to send them:
ref = await mo.actor_ref(uid='ExampleActor', address='127.0.0.1:13425')
results = await ref.batch_method.batch(
ref.batch_method.delay(10, b=20),
ref.batch_method.delay(20),
)