Task 服务#

准备执行图#

当一个 Tileable 图被提交到 Mars Scheduler,一张包含更细粒度的,由 Operand 和 Chunk 构成的图将根据数据源中包含的 chunks 参数被生成。

图融合#

当完成 Chunk 图的生成后,我们将会通过合并图中相邻的节点来减小图的规模,这一合并也能让我们充分利用 numexpr 这样的加速库来加速计算过程。目前 Mars 仅会合并形成单条链的 Operand。例如,当执行下面的代码

import mars.tensor as mt
a = mt.random.rand(100, chunk_size=100)
b = mt.random.rand(100, chunk_size=100)
c = (a + b).sum()

Mars 将会合并 Operand ADD 和 SUM 成为 FUSE 节点。RAND Operand 不会被合并,因为它们并没有和 ADD 及 SUM 组成一条简单的直线。

../../_images/compose.svg

初始 Worker 分配#

为 Operand 分配 Worker 对于图执行的性能而言至关重要。随机分配初始 Operand 可能导致巨大的网络开销,并有可能导致不同 Worker 间作业分配的不平衡。因为非初始节点的分配能够根据其前驱生成数据的物理分布及各个 Worker 的空闲情况方便地确定,在执行图准备阶段,我们只考虑初始 Operand 的分配问题。

初始 Worker 分配需要遵循几个准则。首先,分配给每个 Worker 执行的 Operand 需要尽量保持平衡,这能够使计算集群在整个执行阶段都有较高的利用率,这在执行的最后阶段显得尤其重要。其次,初始节点分配需要使后续节点执行时的网络”传输尽量小。也就是说,初始点分配需要充分遵循局部性原则。

需要注意的是,上述准则在某些情况下会彼此冲突。一个网络传输量最小的分配方案可能会非常偏斜。我们开发了一套启发式算法来获取两个目标的平衡,该算法描述如下:

  1. 选择列表中的第一个初始节点和第一台机器;

  2. 从 Operand 图转换出的无向图中自该点开始进行深度优先搜索;

  3. 如果另一个未被分配的初始节点被访问到,我们将其分配给步骤1中选择的机器;

  4. 当访问到的 Operand 总数大于平均每个 Worker 接受的 Operand 个数时,停止分配;

  5. 前往步骤1,如果仍有 Worker 未被分配 Operand,否则结束。

配置#

task:
    default_config:
        optimize_tileable_graph: true
        optimize_chunk_graph: true
        fuse_enabled: true

接口#

TaskAPI(session_id, task_manager_ref)

WebTaskAPI(session_id, address[, ...])