SessionΒΆ

Sessions can be used for local execution, connecting to a local cluster or an existing Mars cluster.

If a session is not initialized explicitly, Mars will create a session for local execution by default.

>>> import mars.dataframe as md
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # will create a default session for local execution
   0  1
0  1  2
1  3  4
>>> df.fetch()
   0  1
0  1  2
1  3  4

new_session can be used to create new sessions. After created, sessions can be specified as an argument for both execute and fetch.

>>> from mars.session import new_session
>>> import mars.tensor as mt
>>> sess = new_session()
>>> t = mt.random.rand(3, 2)
>>> t.execute(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])
>>> t.fetch(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])

Call .as_default() on a session will set the session as default, .execute() and .fetch() will be constraint to the default session.

>>> from mars.session import new_session
>>> new_session().as_default()  # set default session
<mars.session.Session at 0x1a33bbeb50>
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # execute on the session just created
   0  1
0  1  2
1  3  4
>>> df.fetch()  # fetch from the session just created
   0  1
0  1  2
1  3  4

Each session is isolated. Calling .fetch() on a Mars object which is executed in another session will fail.

>>> from mars.session import new_session
>>> sess = new_session()
>>> df.fetch(session=sess)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-7-f10708ec743f> in <module>
----> 1 df.fetch(session=sess)

~/Workspace/mars/mars/core.py in fetch(self, session, **kw)
    377         if session is None:
    378             session = Session.default_or_local()
--> 379         return session.fetch(self, **kw)
    380
    381     def _attach_session(self, session):

~/Workspace/mars/mars/session.py in fetch(self, *tileables, **kw)
    427             ret_list = True
    428
--> 429         result = self._sess.fetch(*tileables, **kw)
    430
    431         ret = []

~/Workspace/mars/mars/session.py in fetch(self, n_parallel, *tileables, **kw)
    114         if n_parallel is None:
    115             kw['n_parallel'] = cpu_count()
--> 116         return self._executor.fetch_tileables(tileables, **kw)
    117
    118     def create_mutable_tensor(self, name, shape, dtype, fill_value=None, *args, **kwargs):

~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
    383                 _kernel_mode.eager = False
    384             _kernel_mode.eager_count = enter_eager_count + 1
--> 385             return func(*args, **kwargs)
    386         finally:
    387             _kernel_mode.eager_count -= 1

~/Workspace/mars/mars/executor.py in fetch_tileables(self, tileables, **kw)
    919                 # check if the tileable is executed before
    920                 raise ValueError(
--> 921                     'Tileable object {} to fetch must be executed first'.format(tileable))
    922
    923         # if chunk executed, fetch chunk mechanism will be triggered in execute_tileables

ValueError: Tileable object    0  1
0  1  2
1  3  4 to fetch must be executed first

If session argument is not passed to new_session, a local session will be created. The local session will leverage threaded scheduler for execution.

For distributed, the URL of Web UI could be passed to new_session to connect to an existing cluster.

>>> from mars.session import new_session
>>> new_session('http://<web_ip>:<web_port>').as_default()
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # submit to Mars cluster
   0  1
0  1  2
1  3  4