Source code for mars.services.task.api.oscar
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Union
from .... import oscar as mo
from ....core import Tileable
from ....lib.aio import alru_cache
from ...subtask import SubtaskResult
from ..core import TileableGraph, TaskResult, MapReduceInfo
from ..supervisor.manager import TaskManagerActor
from .core import AbstractTaskAPI
[docs]class TaskAPI(AbstractTaskAPI):
[docs] def __init__(
self, session_id: str, task_manager_ref: mo.ActorRefType[TaskManagerActor]
):
self._session_id = session_id
self._task_manager_ref = task_manager_ref
@classmethod
@alru_cache(cache_exceptions=False)
async def create(cls, session_id: str, address: str) -> "TaskAPI":
"""
Create Task API.
Parameters
----------
session_id : str
Session ID
address : str
Supervisor address.
Returns
-------
task_api
Task API.
"""
task_manager_ref = await mo.actor_ref(
address, TaskManagerActor.gen_uid(session_id)
)
return TaskAPI(session_id, task_manager_ref)
async def get_task_results(self, progress: bool = False) -> List[TaskResult]:
return await self._task_manager_ref.get_task_results(progress)
async def submit_tileable_graph(
self,
graph: TileableGraph,
fuse_enabled: bool = None,
extra_config: dict = None,
) -> str:
try:
return await self._task_manager_ref.submit_tileable_graph(
graph, fuse_enabled=fuse_enabled, extra_config=extra_config
)
except mo.ActorNotExist:
raise RuntimeError("Session closed already")
async def get_tileable_graph_as_json(self, task_id: str):
return await self._task_manager_ref.get_tileable_graph_dict_by_task_id(task_id)
async def get_tileable_details(self, task_id: str):
return await self._task_manager_ref.get_tileable_details(task_id)
async def get_tileable_subtasks(
self, task_id: str, tileable_id: str, with_input_output: bool
):
return await self._task_manager_ref.get_tileable_subtasks(
task_id, tileable_id, with_input_output
)
async def wait_task(self, task_id: str, timeout: float = None):
return await self._task_manager_ref.wait_task(task_id, timeout=timeout)
async def get_task_result(self, task_id: str) -> TaskResult:
return await self._task_manager_ref.get_task_result(task_id)
async def get_task_progress(self, task_id: str) -> float:
return await self._task_manager_ref.get_task_progress(task_id)
async def cancel_task(self, task_id: str):
return await self._task_manager_ref.cancel_task(task_id)
async def get_fetch_tileables(self, task_id: str) -> List[Tileable]:
return await self._task_manager_ref.get_task_result_tileables(task_id)
async def set_subtask_result(self, subtask_result: SubtaskResult):
return await self._task_manager_ref.set_subtask_result.tell(subtask_result)
async def get_last_idle_time(self) -> Union[float, None]:
return await self._task_manager_ref.get_last_idle_time()
async def remove_tileables(self, tileable_keys: List[str]):
return await self._task_manager_ref.remove_tileables(tileable_keys)
async def get_map_reduce_info(
self, task_id: str, map_reduce_id: int
) -> MapReduceInfo:
return await self._task_manager_ref.get_map_reduce_info(task_id, map_reduce_id)