Source code for

# Copyright 1999-2021 Alibaba Group Holding Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional, Tuple, Type, TypeVar, Union

from .... import oscar as mo
from ....lib.aio import alru_cache
from ...subtask import Subtask
from ..core import SubtaskScheduleSummary
from .core import AbstractSchedulingAPI

APIType = TypeVar("APIType", bound="SchedulingAPI")

[docs]class SchedulingAPI(AbstractSchedulingAPI):
[docs] def __init__( self, session_id: str, address: str, manager_ref=None, queueing_ref=None, autoscaler_ref=None, ): self._session_id = session_id self._address = address self._manager_ref = manager_ref self._queueing_ref = queueing_ref self._autoscaler = autoscaler_ref
@classmethod @alru_cache async def create(cls: Type[APIType], session_id: str, address: str) -> APIType: from ..supervisor.manager import SubtaskManagerActor manager_ref = await mo.actor_ref( SubtaskManagerActor.gen_uid(session_id), address=address ) from ..supervisor.queueing import SubtaskQueueingActor queueing_ref = await mo.actor_ref( SubtaskQueueingActor.gen_uid(session_id), address=address ) from ...cluster import ClusterAPI from ..supervisor.autoscale import AutoscalerActor cluster_api = await ClusterAPI.create(address) [autoscaler] = await cluster_api.get_supervisor_refs( [AutoscalerActor.default_uid()] ) scheduling_api = SchedulingAPI( session_id, address, manager_ref, queueing_ref, autoscaler ) return scheduling_api async def get_subtask_schedule_summaries( self, task_id: Optional[str] = None ) -> List[SubtaskScheduleSummary]: return await self._manager_ref.get_schedule_summaries(task_id) async def add_subtasks( self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None ): """ Submit subtasks into scheduling service Parameters ---------- subtasks list of subtasks to be submitted to service priorities list of priorities of subtasks """ if priorities is None: priorities = [subtask.priority or tuple() for subtask in subtasks] await self._manager_ref.add_subtasks(subtasks, priorities) @mo.extensible async def update_subtask_priority(self, subtask_id: str, priority: Tuple): """ Update priorities of subtasks Parameters ---------- subtask_id id of subtask to update priority priority list of priority of subtasks """ raise NotImplementedError @update_subtask_priority.batch async def update_subtask_priority(self, args_list, kwargs_list): await self._queueing_ref.update_subtask_priority.batch( *( self._queueing_ref.update_subtask_priority.delay(*args, **kwargs) for args, kwargs in zip(args_list, kwargs_list) ) ) async def cancel_subtasks( self, subtask_ids: List[str], kill_timeout: Union[float, int] = None ): """ Cancel pending and running subtasks. Parameters ---------- subtask_ids ids of subtasks to cancel kill_timeout timeout seconds to kill actor process forcibly """ await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout) async def finish_subtasks( self, subtask_ids: List[str], bands: List[Tuple] = None, schedule_next: bool = True, ): """ Mark subtasks as finished, letting scheduling service to schedule next tasks in the ready queue Parameters ---------- subtask_ids ids of subtasks to mark as finished bands bands of subtasks to mark as finished schedule_next whether to schedule succeeding subtasks """ await self._manager_ref.finish_subtasks(subtask_ids, bands, schedule_next) async def disable_autoscale_in(self): """Disable autoscale in""" await self._autoscaler.disable_autoscale_in() async def try_enable_autoscale_in(self): """Try to enable autoscale in, the autoscale-in will be enabled only when last call corresponding `disable_autoscale_in` has been invoked.""" await self._autoscaler.try_enable_autoscale_in()
class MockSchedulingAPI(SchedulingAPI): @classmethod async def create(cls: Type[APIType], session_id: str, address: str) -> APIType: from ..supervisor import GlobalResourceManagerActor, AutoscalerActor await mo.create_actor( GlobalResourceManagerActor, uid=GlobalResourceManagerActor.default_uid(), address=address, ) await mo.create_actor( AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address ) from .... import resource as mars_resource from ..worker import ( SubtaskExecutionActor, WorkerSlotManagerActor, WorkerQuotaManagerActor, ) await mo.create_actor( SubtaskExecutionActor, subtask_max_retries=0, uid=SubtaskExecutionActor.default_uid(), address=address, ) await mo.create_actor( WorkerSlotManagerActor, uid=WorkerSlotManagerActor.default_uid(), address=address, ) await mo.create_actor( WorkerQuotaManagerActor, {"quota_size": mars_resource.virtual_memory().total}, uid=WorkerQuotaManagerActor.default_uid(), address=address, ) from ..supervisor import SchedulingSupervisorService service = SchedulingSupervisorService({}, address) await service.create_session(session_id) return await super().create(session_id, address)