会话#

会话可以用来本地执行,连接到 本地,以及连接到已经创建的 Mars 集群

如果没有显式创建会话,Mars 会默认创建一个本地执行的会话。

>>> import mars.dataframe as md
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # will create a default session for local execution
   0  1
0  1  2
1  3  4
>>> df.fetch()
   0  1
0  1  2
1  3  4

new_session 可以用来创建新的默认会话。

>>> import mars
>>> mars.new_session()  # create a new default session
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # execute on the session just created
   0  1
0  1  2
1  3  4
>>> df.fetch()  # fetch from the session just created
   0  1
0  1  2
1  3  4

new_session 来创建新的会话,executefetch 方法可以指定会话。

>>> import mars
>>> import mars.tensor as mt
>>> sess = mars.new_session(default=False)
>>> t = mt.random.rand(3, 2)
>>> t.execute(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])
>>> t.fetch(session=sess)
array([[0.9956293 , 0.06604185],
       [0.25585635, 0.98183162],
       [0.04446616, 0.2417941 ]])

调用会话的 .as_default() 方法会把这个会话设为默认,.execute().fetch() 就默认会使用这个会话。

每个会话是隔离的,试图通过 .fetch() 读取别的会话中执行过的 Mars 对象会导致失败。

>>> import mars
>>> sess = mars.new_session(default=False)
>>> df.fetch(session=sess)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-7-f10708ec743f> in <module>
----> 1 df.fetch(session=sess)

~/Workspace/mars/mars/dataframe/core.py in fetch(self, session, **kw)
    525             return self._fetch(session=session, **kw)
    526         else:
--> 527             batches = list(self._iter(batch_size=batch_size,
    528                                       session=session, **kw))
    529             return pd.concat(batches) if len(batches) > 1 else batches[0]

~/Workspace/mars/mars/dataframe/core.py in _iter(self, batch_size, session, **kw)
    509                 yield batch_data._fetch(session=session, **kw)
    510         else:
--> 511             yield self._fetch(session=session, **kw)
    512
    513     def iterbatch(self, batch_size=1000, session=None, **kw):

~/Workspace/mars/mars/core/entity/executable.py in _fetch(self, session, **kw)
    120         session = _get_session(self, session)
    121         self._check_session(session, 'fetch')
--> 122         return fetch(self, session=session, **kw)
    123
    124     def fetch(self, session: SessionType = None, **kw):

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(tileable, session, *tileables, **kwargs)
   1391
   1392     session = _ensure_sync(session)
-> 1393     return session.fetch(tileable, *tileables, **kwargs)
   1394
   1395

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
   1240     def fetch(self, *tileables, **kwargs) -> list:
   1241         coro = _fetch(*tileables, session=self._isolated_session, **kwargs)
-> 1242         return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
   1243
   1244     @implements(AbstractSyncSession.decref)

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/miniconda3/envs/mars3.8/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/Workspace/mars/mars/deploy/oscar/session.py in _fetch(tileable, session, *tileables, **kwargs)
   1375         tileable, tileables = tileable[0], tileable[1:]
   1376     session = _get_isolated_session(session)
-> 1377     data = await session.fetch(tileable, *tileables, **kwargs)
   1378     return data[0] if len(tileables) == 0 else data
   1379

~/Workspace/mars/mars/deploy/oscar/session.py in fetch(self, *tileables, **kwargs)
    807             fetch_infos_list = []
    808             for tileable in tileables:
--> 809                 fetch_tileable, indexes = self._get_to_fetch_tileable(tileable)
    810                 chunk_to_slice = None
    811                 if indexes is not None:

~/Workspace/mars/mars/deploy/oscar/session.py in _get_to_fetch_tileable(self, tileable)
    751                 break
    752             else:
--> 753                 raise ValueError(f'Cannot fetch unexecuted '
    754                                  f'tileable: {tileable}')
    755

ValueError: Cannot fetch unexecuted tileable: DataFrame(op=DataFrameDataSource)

如果 new_session 没有参数,会创建本地会话。本地会话会利用 多线程调度器 执行。

对于分布式环境,new_session 调用时需要传入 Web UI 的 URL 以连接到集群。

>>> import mars
>>> mars.new_session('http://<web_ip>:<web_port>')
>>> df = md.DataFrame([[1, 2], [3, 4]])
>>> df.execute()  # submit to Mars cluster
   0  1
0  1  2
1  3  4