会话¶
会话可以用来本地执行,连接到 本地集群,以及连接到已经创建的 Mars 集群。
如果没有显式创建会话,Mars 会默认创建一个本地执行的会话。
>>> import mars.dataframe as md
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute() # will create a default session for local execution
0 1
0 1 2
1 3 4
>>> df.fetch()
0 1
0 1 2
1 3 4
用 new_session
来创建新的会话,execute
和 fetch
方法可以指定会话。
>>> from mars.session import new_session
>>> import mars.tensor as mt
>>> sess = new_session()
>>> t = mt.random.rand(3, 2)
>>> t.execute(session=sess)
array([[0.9956293 , 0.06604185],
[0.25585635, 0.98183162],
[0.04446616, 0.2417941 ]])
>>> t.fetch(session=sess)
array([[0.9956293 , 0.06604185],
[0.25585635, 0.98183162],
[0.04446616, 0.2417941 ]])
调用会话的 .as_default()
方法会把这个会话设为默认,.execute()
和 .fetch()
就默认会使用这个会话。
>>> from mars.session import new_session
>>> new_session().as_default() # set default session
<mars.session.Session at 0x1a33bbeb50>
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute() # execute on the session just created
0 1
0 1 2
1 3 4
>>> df.fetch() # fetch from the session just created
0 1
0 1 2
1 3 4
每个会话是隔离的,试图通过 .fetch()
读取别的会话中执行过的 Mars 对象会导致失败。
>>> from mars.session import new_session
>>> sess = new_session()
>>> df.fetch(session=sess)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-7-f10708ec743f> in <module>
----> 1 df.fetch(session=sess)
~/Workspace/mars/mars/core.py in fetch(self, session, **kw)
377 if session is None:
378 session = Session.default_or_local()
--> 379 return session.fetch(self, **kw)
380
381 def _attach_session(self, session):
~/Workspace/mars/mars/session.py in fetch(self, *tileables, **kw)
427 ret_list = True
428
--> 429 result = self._sess.fetch(*tileables, **kw)
430
431 ret = []
~/Workspace/mars/mars/session.py in fetch(self, n_parallel, *tileables, **kw)
114 if n_parallel is None:
115 kw['n_parallel'] = cpu_count()
--> 116 return self._executor.fetch_tileables(tileables, **kw)
117
118 def create_mutable_tensor(self, name, shape, dtype, fill_value=None, *args, **kwargs):
~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs)
383 _kernel_mode.eager = False
384 _kernel_mode.eager_count = enter_eager_count + 1
--> 385 return func(*args, **kwargs)
386 finally:
387 _kernel_mode.eager_count -= 1
~/Workspace/mars/mars/executor.py in fetch_tileables(self, tileables, **kw)
919 # check if the tileable is executed before
920 raise ValueError(
--> 921 'Tileable object {} to fetch must be executed first'.format(tileable))
922
923 # if chunk executed, fetch chunk mechanism will be triggered in execute_tileables
ValueError: Tileable object 0 1
0 1 2
1 3 4 to fetch must be executed first
如果 new_session
没有参数,会创建本地会话。本地会话会利用 多线程调度器 执行。
对于分布式环境,new_session
调用时需要传入 Web UI 的 URL 以连接到集群。
>>> from mars.session import new_session
>>> new_session('http://<web_ip>:<web_port>').as_default()
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute() # submit to Mars cluster
0 1
0 1 2
1 3 4