Worker 中的执行细节

一个 Mars Worker 包含多个进程,以减少全局解释器锁(GIL)对执行的影响。具体的执行在独立的进程中完成。为减少不必要的内存拷贝和进程间通讯,Mars Worker 使用共享内存来存储执行结果。

当一个作业被提交到 Worker,它将首先被分配内存。此后,Operand 依赖的其他 Worker 上的数据,或者当前 Worker 上已被 spill 到磁盘的数据将会被重新载入内存中。此时,所有计算需要的数据已经都在内存中,真正的计算过程将启动。当计算完成,Worker 将会把作业放到共享存储空间中。这四种执行状态的转换关系见下图。

../../_images/worker-states.svg

执行控制

Mars Worker 通过 ExecutionActor 控制**所有** Operand 在 Worker 中的执行。该 Actor 本身并不参与实际运算或者数据传输,只是向其他 Actor 提交任务。

Scheduler 中的 OperandActor 通过 ExecutionActor 上的 execute_graph 调用向 Worker 提交作业。此后,add_finish_callback 将被调用以注册一个回调。这一设计允许执行结果被多个位置接收,这对故障恢复有价值。

ExecutionActor 使用 mars.promise 模块来同时处理多个 Operand 的执行请求。具体的执行步骤通过 Promise 类的 then 方法相串联。当最终的执行结果被存储,之前注册的回调将被触发。如果在之前的任意执行步骤中发生错误,该错误会被传导到最后 catch 方法注册的处理函数中并得到处理。

Operand 的排序

所有在 READY 状态的 Operand 都被提交到 Scheduler 选择的 Worker 中。因此,在执行的绝大多数时间里,提交到 Worker 的 Operand 个数通常都高于单个 Worker 能够处理的 Operand 总数。因此,Worker 需要对 Operand 进行排序,此后选择一部分 Worker 来执行。这一排序过程在 TaskQueueActor 中进行,该 Actor 中维护一个优先队列,其中存储 Operand 的相关信息。与此同时,TaskQueueActor 定时运行一个作业分配任务,对处于优先队列头部的 Operand 分配执行资源直至没有多余的资源来运行 Operand,这一分配过程也会在新 Operand 提交或者 Operand 执行完成时触发。

内存管理

Mars Worker 管理两部分内存。第一部分是每个 Worker 进程私有的内存空间,由每个进程自己持有。第二部分是所有进程共享的内存空间,由 Apache Arrow 中的 plasma_store 持有。

为了避免进程内存溢出,我们引入了 Worker 级别的 QuotaActor,用于分配进程内存。当一个 Operand 开始执行前,它将为输入和输出 Chunk 向 QuotaActor 发送批量内存请求。如果剩余的内存空间可以满足请求,该请求会被 QuotaActor 接受。否则,请求将排队等待空闲资源。当相关内存使用被释放,请求的资源会被释放,此时,QuotaActor 能够为其他 Operand 分配资源。

共享内存由 plasma_store 管理,通常会占据整个内存的 50%。由于不存在溢出的可能,这部分内存无需经过 QuotaActor 而是直接通过 plasma_store 的相关方法进行分配。当共享内存使用殆尽,Mars Worker 会尝试将一部分不在使用的 Chunk spill 到磁盘中,以腾出空间容纳新的 Chunk。

从共享内存 spill 到磁盘的 Chunk 数据可能会被未来的 Operand 重新使用,而从磁盘重新载入共享内存的操作可能会非常耗费 IO 资源,尤其在共享内存已经耗尽,需要 spill 其他 Chunk 到磁盘以容纳载入的 Chunk 时。因此,当数据共享并不需要时,例如该 Chunk 只会被一个 Operand 使用,我们会将 Chunk 直接载入进程私有内存中,而不是共享内存,这可以显著减少作业总执行时间。