.. _mars_ray: Run on Ray ================= Mars also has deep integration with Ray and can run on `Ray `_ efficiently and natively. Basic steps ----------- Install Ray locally: .. code-block:: bash pip install ray (Optional) Start a Ray cluster or Mars starts a Ray cluster automatically: .. code-block:: python import ray ray.init() (Optional) Or connecting to a existing Ray cluster using `Ray client `_: .. code-block:: python import ray ray.init(address='ray://:10001') Creating a Mars on Ray runtime in the Ray cluster and do the computing: .. code-block:: python import mars import mars.tensor as mt import mars.dataframe as md # This driver is the Mars supervisor. session = mars.new_session(backend='ray') mt.random.RandomState(0).rand(1000_0000, 5).sum().execute() df = md.DataFrame( mt.random.rand(1000_0000, 4, chunk_size=500_0000), columns=list('abcd')) print(df.sum().execute()) print(df.describe().execute()) # Convert mars dataframe to ray dataset ds = md.to_ray_dataset(df) print(ds.schema(), ds.count()) ds.filter(lambda row: row['a'] > 0.5).show(5) # Convert ray dataset to mars dataframe df2 = md.read_ray_dataset(ds) print(df2.head(5).execute()) Stop the created Mars on Ray runtime: .. code-block:: python session.stop_server() Customizing cluster ------------------- There are two ways to initialize a Mars on Ray session: - `mars.new_session(...) # Start Mars supervisor in current process.` Recommend for most use cases. - `mars.new_ray_session(...) # Start a Ray actor for Mars supervisor.` Recommend for large scale compute or compute through Ray client. Start a Ray actor for Mars supervisor: .. code-block:: python import mars # Start a Ray actor for Mars supervisor. session = mars.new_ray_session(backend='ray') Connect to the created Mars on Ray runtime and do the computing, the supervisor virtual address is the name of Ray actor for Mars supervisor, e.g. `ray://ray-cluster-1672904753/0/0`. .. code-block:: python import mars import mars.tensor as mt # Be aware that `mars.new_ray_session()` connects to an existing Mars # cluster requires Ray runtime. # e.g. Current process is a initialized Ray driver, client or worker. session = mars.new_ray_session( address='ray://', session_id='abcd', backend='ray', default=True) session.execute(mt.random.RandomState(0).rand(100, 5).sum()) The ``new_ray_session`` function provides several keyword arguments for users to define the cluster. Arguments for supervisors: +--------------------+-----------------------------------------------------------------+ | Argument | Description | +====================+=================================================================+ | supervisor_cpu | Number of CPUs for supervisor, 1 by default. | +--------------------+-----------------------------------------------------------------+ | supervisor_mem | Memory size for supervisor in bytes, 1G by default. | +--------------------+-----------------------------------------------------------------+ Arguments for workers: +--------------------+-----------------------------------------------------------------+ | Argument | Description | +====================+=================================================================+ | worker_cpu | Number of CPUs for every worker, 2 by default. | +--------------------+-----------------------------------------------------------------+ | worker_mem | Memory size for workers in bytes, 2G by default. | +--------------------+-----------------------------------------------------------------+ For instance, if you want to create a Mars cluster with a standalone supervisor, you can use the code below (In this example, one Ray node has 16 CPUs in total): .. code-block:: python import mars session = mars.new_ray_session(supervisor_cpu=16)