本文主要介绍一下 Ray 的内部的实现机制
Ray 架构
Ray 在设计上也是使用了 Master-Slave 架构,Master 节点负责全局协调和状态维护,Slave 负责具体的任务执行,Ray 架构图如下所示(图片来自 Paper: Real-Time Machine Learning: The Missing Pieces[1]):

Distributed Ray Overview[2],当前 Ray 的版本中,Master 节点叫做 head node,其他节点叫做 worker node。
这里简单看下,其中比较关键的组件:
1.GlobalScheduler:Master 上启动了一个全局调度器,用户与接收 LocalScheduler 提交的任务,并将任何分配给合适的 Node 执行;2.GlobalControlStore(GCS):它维护整个集群的状态信息,与其他系统的区别是它的 Scale 能力,它可以做到独立部署,并且横向扩展,官方默认是把这个状态信息储存到 Redis 中;3.LocalScheduler(在最新的版本中这个组件叫做 raylet):每个 Node 上启动了一个 LocalScheduler,用于提交任务到 GlobalScheduler 以及分配任务到当前 Node 的 worker 进程上;4.Worker:每个 Node 上可以启动多个 Worker 进程执行分布式任务,并将计算结果保存到 ObjectStore 中;5.ObjectStore:每个 Node 上启动了一个 ObjectStore 存储只读对象,Node 的 worker 之间通过共享内存的方式访问这些对象数据;6.Plasma:每个 Node 上的 ObjectStore 都是由 Plasma 来管理,它可以在 worker 访问本地不存在的远程数据对象时,主动拉取其他 Node 上的对象到本地机器;
在提交作业上,Ray 的设计与 Spark 有些类似,都是通过 Driver 提交的,Driver 可以运行在本地(打开 Python Shell 运行 Ray 作业)、也可以运行在 Node 上(作为一个 worker 运行)。
Ray 的两大抽象:Task 和 Actor
在 RL 场景中,它既需要无状态、短暂的计算模式,又需要有状态、长时连续运行的计算模式,Ray 在设计时,就考虑了同时满足这两种需求,因此,Ray 在计算模型上提供了对 Task 和 Actor 的统一抽象。Task 应用于无状态的计算,它的容错及 load balance 都比较简单;Actor 应用于有状态的计算,它的容错和 load balance 就复杂了一些。
Task
无状态的计算
# 定一个 remote function,这里可以在 remote() 中添加相应的配置@ray.remote(num_cpus=1, num_gpus=0, num_return_vals=2)def segment(image, threshold=128):dark = image < thresholdbright = image > thresholdreturn dark, bright# 在集群中异步执行一个 task# 会立刻返回一个 future,它实际上就是一个 ObjectID,唯一标识这个 taskfuture = imread.remote('/data/python.png')# 返回这个 task 的执行结果,block 直到结果返回arr = ray.get(future)
Actor
有状态的计算,根据调用有顺序地进行计算,无需加锁(这个也是 Actor 模型的特点)
# 通过 Python class 注册一个 Actor@ray.remoteclass ParameterServer(object):def __init__(self, keys, values):values = [value.copy() for value in values] self.weights = dict(zip(keys, values))def push(self, keys, values):for key, value in zip(keys, values):self.weights[key] += valuedef pull(self, keys):return [self.weights[key] for key in keys]# 初始化这个 actorps = ParameterServer.remote(keys, initial_values)# 执行这个 actor 的相关方法# actor 方法调用是顺序执行的,这个模型非常简单,不会有加锁相应的操作future0 = ps.push.remote(keys, grads0)future1 = ps.push.remote(keys, grads1)future2 = ps.grab.pull(keys)
在实际的应用中,Actor 与 Task 它们是可以组合在一起使用的,作业内的这种组合也就构成了这个作业的 Graph,只不过在 Ray 中,这个 Graph 并不是固定的。
Remote Task exectute
下面有一个示例图(图片来自 Ray: A Distributed Framework for Emerging AI Applications[3]),介绍了一个 Task 是如何调度运行(a 存储在 Node N1 上,b 存储在 Node N2 上,这个执行逻辑跟 Actor 其实没有本质区别,都是类似的):

其执行步骤如下(在图中的第 0 步中,它会先将 add
函数注册到 GCS 中,个人理解这个应该不会将 add
提前广播到所有结果,应该调度到哪个节点,该节点再从 GCS 获取):
1.执行 `id = add.remote(a,b)`` 函数,先往 Local Scheduler 进行调度;2.Local Scheduler 发现本地只有 a
,没有 b
,向 Global Scheduler 求助;3.Global Scheduler 查询 GCS 中的 Object Table 获取 b
的位置;4.Global Scheduler 决定将这个 task 调度到 b
所在节点 N2;5.N2 的 Local Scheduler 发现本地没有 a
;6.N2 向 GCS 中的 Object Table 查询 a
的位置;7.N2 从 a
的 Object Store 中拷贝到本地的 Object Store;8.执行函数;
而调用 get
结果获取执行结果的流程如下图(图片来自 Ray: A Distributed Framework for Emerging AI Applications[3]):

其执行步骤如下:
1.使用 c
的 feature 请求 local object store;2.由于 N1 的 local object store 没有 c
的缓存,它会向 GCS 求助,但是此时 c
的计算还没完成,因此 N1 的 Object store 会在 GCS 的 Object Table 中注册一个 callback
,当 c
的结果完成后会通知 N1 的 Object Store;3.此时,N2 完成计算逻辑,它会把 c
存储到 N2 的 Local Object Store 中;4.N2 的 Local Object Store 在存储 c
时,也会把结果同步到 GCS,告诉 GCS c
的结果现在存储在 N2 节点中;5.GCS 触发 N1 Object Store 存储的 callback
;6.N1 从 N2 节点将 c
拷贝过来;7.返回结果;
容错机制
它的容错是基于 GCS 存储的状态来做的,可以分为以下两种情况:
1.Non Actors:根据血缘关系重新构造上下游关系,直接恢复即可;2.Actors:就需要依赖 Checkpoint 及数据重放来恢复了。
调度实现
Ray 的一个目标是实现每秒百万级任务调度,为此设计了两级调度器,包括全局调度器和每个节点上的本地调度器(当前叫做 raylet)。为了降低全部调度器的负载,节点 (worker,actor) 上派生的任务首先提交给 LocalScheduler,如果本地的资源无法满足,则会把待调度的任务提交给 GlobalScheduler(图片来自:Ray: A Distributed Framework for Emerging AI Applications[3]):

GlobalScheduler 会根据每个节点的负载和任务的需求进行调度。决策依据有:
1.每个 Node 上任务队列的大小;2.Node 上任务的排队时间;3.任务需要的数据传输到该节点所需的时间;
当 GlobalScheduler 出现瓶颈时,Ray 会实例化更多的 GlobalScheduler 来分担工作,这得益于 GCS,使得 GlobalScheduler 可扩展。
这里简单介绍了一下 Ray 的内部实现机制,因此没有去研究源码,上面的总结主要是从论文和官网获取,可能跟现在的代码实现有一些差异,但是核心设计的思想不会有太多变化。
References
[1]
Paper: Real-Time Machine Learning: The Missing Pieces: https://arxiv.org/pdf/1703.03924.pdf[2]
Distributed Ray Overview: https://docs.ray.io/en/latest/cluster/index.html?highlight=Raylet[3]
Ray: A Distributed Framework for Emerging AI Applications: https://www.usenix.org/system/files/osdi18-moritz.pdf




