会话可以用来本地执行,连接到 本地集群,以及连接到已经创建的 Mars 集群。
如果没有显式创建会话,Mars 会默认创建一个本地执行的会话。
>>> import mars.dataframe as md >>> df = md.DataFrame([[1, 2], [3, 4]]) >>> df.execute() # will create a default session for local execution 0 1 0 1 2 1 3 4 >>> df.fetch() 0 1 0 1 2 1 3 4
用 new_session 来创建新的会话,execute 和 fetch 方法可以指定会话。
new_session
execute
fetch
>>> from mars.session import new_session >>> import mars.tensor as mt >>> sess = new_session() >>> t = mt.random.rand(3, 2) >>> t.execute(session=sess) array([[0.9956293 , 0.06604185], [0.25585635, 0.98183162], [0.04446616, 0.2417941 ]]) >>> t.fetch(session=sess) array([[0.9956293 , 0.06604185], [0.25585635, 0.98183162], [0.04446616, 0.2417941 ]])
调用会话的 .as_default() 方法会把这个会话设为默认,.execute() 和 .fetch() 就默认会使用这个会话。
.as_default()
.execute()
.fetch()
>>> from mars.session import new_session >>> new_session().as_default() # set default session <mars.session.Session at 0x1a33bbeb50> >>> df = md.DataFrame([[1, 2], [3, 4]]) >>> df.execute() # execute on the session just created 0 1 0 1 2 1 3 4 >>> df.fetch() # fetch from the session just created 0 1 0 1 2 1 3 4
每个会话是隔离的,试图通过 .fetch() 读取别的会话中执行过的 Mars 对象会导致失败。
>>> from mars.session import new_session >>> sess = new_session() >>> df.fetch(session=sess) --------------------------------------------------------------------------- ValueError Traceback (most recent call last) <ipython-input-7-f10708ec743f> in <module> ----> 1 df.fetch(session=sess) ~/Workspace/mars/mars/core.py in fetch(self, session, **kw) 377 if session is None: 378 session = Session.default_or_local() --> 379 return session.fetch(self, **kw) 380 381 def _attach_session(self, session): ~/Workspace/mars/mars/session.py in fetch(self, *tileables, **kw) 427 ret_list = True 428 --> 429 result = self._sess.fetch(*tileables, **kw) 430 431 ret = [] ~/Workspace/mars/mars/session.py in fetch(self, n_parallel, *tileables, **kw) 114 if n_parallel is None: 115 kw['n_parallel'] = cpu_count() --> 116 return self._executor.fetch_tileables(tileables, **kw) 117 118 def create_mutable_tensor(self, name, shape, dtype, fill_value=None, *args, **kwargs): ~/Workspace/mars/mars/utils.py in _wrapped(*args, **kwargs) 383 _kernel_mode.eager = False 384 _kernel_mode.eager_count = enter_eager_count + 1 --> 385 return func(*args, **kwargs) 386 finally: 387 _kernel_mode.eager_count -= 1 ~/Workspace/mars/mars/executor.py in fetch_tileables(self, tileables, **kw) 919 # check if the tileable is executed before 920 raise ValueError( --> 921 'Tileable object {} to fetch must be executed first'.format(tileable)) 922 923 # if chunk executed, fetch chunk mechanism will be triggered in execute_tileables ValueError: Tileable object 0 1 0 1 2 1 3 4 to fetch must be executed first
如果 new_session 没有参数,会创建本地会话。本地会话会利用 多线程调度器 执行。
对于分布式环境,new_session 调用时需要传入 Web UI 的 URL 以连接到集群。
>>> from mars.session import new_session >>> new_session('http://<web_ip>:<web_port>').as_default() >>> df = md.DataFrame([[1, 2], [3, 4]]) >>> df.execute() # submit to Mars cluster 0 1 0 1 2 1 3 4