容错

当下,Mars 在两个层面上有容错支持:进程级别容错和 Worker 级别容错,尚未实现 Scheduler 级别容错。

进程级别容错

Mars Worker 在执行时为多进程。当一个进程因内存溢出等原因终止,且该进程不是控制 Actor 所在的 0 号进程,Mars 将会把所有该进程上执行的 Operand 标记为失败,启动另一个进程,并在其上恢复之前进程上的所有 Actor 并重启失败的 Operand。

Worker 容错

注解

自 0.2.0a2 起支持

因为 Mars 使用执行图来调度 Operand,当一部分 Worker 不再相应时,Scheduler 将会找出丢失的 Chunk,进而找到受影响的 Operand。此后,这部分 Operand 将被重新调度。

故障消息传递和处理

当一个 Worker 停止响应,其他 Worker 及 Scheduler 中的各个 Actor 将感知这一变化并将之回馈给 ResourceActor,此后 ResourceActor 将会把 Worker 列表中的改变广播到所有 Session 中。SessionActor 在收到这些消息后,将会收集当前 Session 中丢失的数据所对应的 key,并将收集到的所有信息转发给各个正在运行的 GraphActor,并在 GraphActor 中执行真正的故障恢复操作。

整个故障通知流程如下图。

../../_images/worker-failover-notification.svg

当收到故障恢复调用时,GraphActor 将首先尝试重新分配受故障影响 Operand 的状态并重新分配初始节点的 Worker,此后向相关的 OperandActor 发送恢复请求。

状态重置

当某些 Worker 失去联系,在这些 Worker 中存储的数据即告丢失。因而,我们需要改变相关 Operand 的状态,以使它们能重新运行。由于存在 Operand 相关数据的充要条件是 Operand 状态为 FINISHED,正如 Operand 状态 一节所说的那样,我们设计了一个两阶段的扫描过程来计算受影响 Operand 的目标状态。

  1. 扫描所有状态为 FINISHED 且所带数据丢失的 Operand,将它们及它们的后继节点标记为受影响;

  2. 从受影响的 Operand 开始。从底部向上扫描 Graph;

  3. 扫描每个受影响 Operand 的前驱。如果前驱的数据丢失,或处于一个不持有数据的状态中(例如 FREED 或者 UNSCHEDULED),前驱将被标记为受影响;

  4. 如果当前节点没有前驱受影响,则将该节点的新状态指定为 READY,否则指定为 UNSCHEDULED

  5. 当所有受影响 Operand 均已处理完成,则停止,否则继续前往步骤 2。

重新分配初始 Worker

当一部分 Worker 失去联系,一些在 Graph 初始化 时被指派 Worker 的初始节点可能失去指派的 Worker。与此同时,Worker 数量的变化也会导致 Worker 分配的不平衡。我们使用一种自适应的 Worker 重分配算法来解决这一问题。该算法框架与最初的初始节点分配算法一致,区别在于我们并不访问已经被执行过的初始节点,同时原算法步骤 3 中停止条件所涉及的访问节点范围限制为平均到每个 Worker 的节点数减去已经在该 Worker 中执行过的节点数。