Session#

Sessions can be used for local execution, connecting to a local cluster or an existing Mars cluster.

If a session is not initialized explicitly, Mars will create a session for local execution by default.

>>> import mars.dataframe as md
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # will create a default session for local execution
   0  1
0  1  2
1  3  4
>>> df.fetch()
   0  1
0  1  2
1  3  4

new_session can be used to create new default sessions.

>>> import mars
>>> mars.new_session()  # create a new default session
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # execute on the session just created
   0  1
0  1  2
1  3  4
>>> df.fetch()  # fetch from the session just created
   0  1
0  1  2
1  3  4

Sessions can be specified explicitly as an argument for both execute and fetch.

>>> import mars
>>> import mars.tensor as mt
>>> sess = mars.new_session(default=False)
>>> t = mt.random.rand(3, 2)
>>> t.execute(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])
>>> t.fetch(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])

Call .as_default() explicitly on a session will set the session as default, .execute() and .fetch() will be constraint to the default session.

Each session is isolated. Calling .fetch() on a Mars object which is executed in another session will fail.

>>> import mars
>>> sess = mars.new_session(default=False)
>>> df.fetch(session=sess)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-7-f10708ec743f> in <module>
----> 1 df.fetch(session=sess)

~/Workspace/mars/mars/dataframe/core.py in fetch(self, session, **kw)
    525             return self._fetch(session=session, **kw)
    526         else:
--> 527             batches = list(self._iter(batch_size=batch_size,
    528                                       session=session, **kw))
    529             return pd.concat(batches) if len(batches) > 1 else batches[0]

~/Workspace/mars/mars/dataframe/core.py in _iter(self, batch_size, session, **kw)
    509                 yield batch_data._fetch(session=session, **kw)
    510         else:
--> 511             yield self._fetch(session=session, **kw)
    512
    513     def iterbatch(self, batch_size=1000, session=None, **kw):

~/Workspace/mars/mars/core/entity/executable.py in _fetch(self, session, **kw)
    120         session = _get_session(self, session)
    121         self._check_session(session, 'fetch')
--> 122         return fetch(self, session=session, **kw)
    123
    124     def fetch(self, session: SessionType = None, **kw):

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(tileable, session, *tileables, **kwargs)
   1391
   1392     session = _ensure_sync(session)
-> 1393     return session.fetch(tileable, *tileables, **kwargs)
   1394
   1395

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
   1240     def fetch(self, *tileables, **kwargs) -> list:
   1241         coro = _fetch(*tileables, session=self._isolated_session, **kwargs)
-> 1242         return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
   1243
   1244     @implements(AbstractSyncSession.decref)

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/Workspace/mars/mars/deploy/oscar/session.py in _fetch(tileable, session, *tileables, **kwargs)
   1375         tileable, tileables = tileable[0], tileable[1:]
   1376     session = _get_isolated_session(session)
-> 1377     data = await session.fetch(tileable, *tileables, **kwargs)
   1378     return data[0] if len(tileables) == 0 else data
   1379

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
    807             fetch_infos_list = []
    808             for tileable in tileables:
--> 809                 fetch_tileable, indexes = self._get_to_fetch_tileable(tileable)
    810                 chunk_to_slice = None
    811                 if indexes is not None:

~/Workspace/mars/mars/deploy/oscar/session.py in _get_to_fetch_tileable(self, tileable)
    751                 break
    752             else:
--> 753                 raise ValueError(f'Cannot fetch unexecuted '
    754                                  f'tileable: {tileable}')
    755

ValueError: Cannot fetch unexecuted tileable: DataFrame(op=DataFrameDataSource)

If session argument is not passed to new_session, a local session will be created.

For distributed, the URL of Web UI could be passed to new_session to connect to an existing cluster.

>>> import mars
>>> mars.new_session('http://<web_ip>:<web_port>')
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # submit to Mars cluster
   0  1
0  1  2
1  3  4