会话

会话可以用来本地执行,连接到 本地集群,以及连接到已经创建的 Mars 集群

如果没有显式创建会话,Mars 会默认创建一个本地执行的会话。

>>> import mars.dataframe as md
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # will create a default session for local execution
   0  1
0  1  2
1  3  4
>>> df.fetch()
   0  1
0  1  2
1  3  4

new_session 来创建新的会话,executefetch 方法可以指定会话。

>>> from mars.session import new_session
>>> import mars.tensor as mt
>>> sess = new_session()
>>> t = mt.random.rand(3, 2)
>>> t.execute(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])
>>> t.fetch(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])

调用会话的 .as_default() 方法会把这个会话设为默认,.execute().fetch() 就默认会使用这个会话。

>>> from mars.session import new_session
>>> new_session().as_default()  # set default session
<mars.session.Session at 0x1a33bbeb50>
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # execute on the session just created
   0  1
0  1  2
1  3  4
>>> df.fetch()  # fetch from the session just created
   0  1
0  1  2
1  3  4

每个会话是隔离的,试图通过 .fetch() 读取别的会话中执行过的 Mars 对象会导致失败。

>>> from mars.session import new_session
>>> sess = new_session()
>>> df.fetch(session=sess)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-7-f10708ec743f> in <module>
----> 1 df.fetch(session=sess)

~/Workspace/mars/mars/core.py in fetch(self, session, **kw)
    377         if session is None:
    378             session = Session.default_or_local()
--> 379         return session.fetch(self, **kw)
    380
    381     def _attach_session(self, session):

~/Workspace/mars/mars/session.py in fetch(self, *tileables, **kw)
    427             ret_list = True
    428
--> 429         result = self._sess.fetch(*tileables, **kw)
    430
    431         ret = []

~/Workspace/mars/mars/session.py in fetch(self, n_parallel, *tileables, **kw)
    114         if n_parallel is None:
    115             kw['n_parallel'] = cpu_count()
--> 116         return self._executor.fetch_tileables(tileables, **kw)
    117
    118     def create_mutable_tensor(self, name, shape, dtype, fill_value=None, *args, **kwargs):

~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
    383                 _kernel_mode.eager = False
    384             _kernel_mode.eager_count = enter_eager_count + 1
--> 385             return func(*args, **kwargs)
    386         finally:
    387             _kernel_mode.eager_count -= 1

~/Workspace/mars/mars/executor.py in fetch_tileables(self, tileables, **kw)
    919                 # check if the tileable is executed before
    920                 raise ValueError(
--> 921                     'Tileable object {} to fetch must be executed first'.format(tileable))
    922
    923         # if chunk executed, fetch chunk mechanism will be triggered in execute_tileables

ValueError: Tileable object    0  1
0  1  2
1  3  4 to fetch must be executed first

如果 new_session 没有参数,会创建本地会话。本地会话会利用 多线程调度器 执行。

对于分布式环境,new_session 调用时需要传入 Web UI 的 URL 以连接到集群。

>>> from mars.session import new_session
>>> new_session('http://<web_ip>:<web_port>').as_default()
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # submit to Mars cluster
   0  1
0  1  2
1  3  4