Source code for mars.services.cluster.backends.base
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Dict, List, Optional, Type
from ..core import NodeRole
[docs]class AbstractClusterBackend(ABC):
name = None
@classmethod
@abstractmethod
async def create(
cls, node_role: NodeRole, lookup_address: Optional[str], pool_address: str
) -> "AbstractClusterBackend":
"""
Parameters
----------
node_role
lookup_address
pool_address
Returns
-------
"""
@abstractmethod
async def watch_supervisors(self) -> AsyncGenerator[List[str], None]:
"""
Watch changes of supervisors
Returns
-------
out : AsyncGenerator[List[str]]
Generator of list of schedulers
"""
@abstractmethod
async def get_supervisors(self, filter_ready: bool = True) -> List[str]:
"""
Get list of supervisors
Parameters
----------
filter_ready : bool
True if return ready nodes only, or return starting and ready nodes
Returns
-------
out : List[str]
List of supervisors
"""
@abstractmethod
async def request_worker(
self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None
) -> str:
"""
Create a new worker
Returns
-------
Address of the new created worker
"""
@abstractmethod
async def release_worker(self, address: str):
"""
Return a worker
"""
@abstractmethod
async def reconstruct_worker(self, address: str):
"""
Reconstruct a worker
"""
_cluster_backend_types: Dict[str, Type[AbstractClusterBackend]] = dict()
def register_cluster_backend(backend: Type[AbstractClusterBackend]):
_cluster_backend_types[backend.name] = backend
return backend
def get_cluster_backend(backend_name: str) -> Type[AbstractClusterBackend]:
return _cluster_backend_types[backend_name]