暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Ray 之内部实现机制

柳年思水 2020-08-29
1825

本文主要介绍一下 Ray 的内部的实现机制

Ray 架构

Ray 在设计上也是使用了 Master-Slave 架构,Master 节点负责全局协调和状态维护,Slave 负责具体的任务执行,Ray 架构图如下所示(图片来自 Paper: Real-Time Machine Learning: The Missing Pieces[1]):

Distributed Ray Overview[2],当前 Ray 的版本中,Master 节点叫做 head node,其他节点叫做 worker node。

这里简单看下,其中比较关键的组件:

1.GlobalScheduler:Master 上启动了一个全局调度器,用户与接收 LocalScheduler 提交的任务,并将任何分配给合适的 Node 执行;2.GlobalControlStore(GCS):它维护整个集群的状态信息,与其他系统的区别是它的 Scale 能力,它可以做到独立部署,并且横向扩展,官方默认是把这个状态信息储存到 Redis 中;3.LocalScheduler(在最新的版本中这个组件叫做 raylet):每个 Node 上启动了一个 LocalScheduler,用于提交任务到 GlobalScheduler 以及分配任务到当前 Node 的 worker 进程上;4.Worker:每个 Node 上可以启动多个 Worker 进程执行分布式任务,并将计算结果保存到 ObjectStore 中;5.ObjectStore:每个 Node 上启动了一个 ObjectStore 存储只读对象,Node 的 worker 之间通过共享内存的方式访问这些对象数据;6.Plasma:每个 Node 上的 ObjectStore 都是由 Plasma 来管理,它可以在 worker 访问本地不存在的远程数据对象时,主动拉取其他 Node 上的对象到本地机器;

在提交作业上,Ray 的设计与 Spark 有些类似,都是通过 Driver 提交的,Driver 可以运行在本地(打开 Python Shell 运行 Ray 作业)、也可以运行在 Node 上(作为一个 worker 运行)。

Ray 的两大抽象:Task 和 Actor

在 RL 场景中,它既需要无状态、短暂的计算模式,又需要有状态、长时连续运行的计算模式,Ray 在设计时,就考虑了同时满足这两种需求,因此,Ray 在计算模型上提供了对 Task 和 Actor 的统一抽象。Task 应用于无状态的计算,它的容错及 load balance 都比较简单;Actor 应用于有状态的计算,它的容错和 load balance 就复杂了一些。

Task

无状态的计算

# 定一个 remote function,这里可以在 remote() 中添加相应的配置
@ray.remote(num_cpus=1, num_gpus=0, num_return_vals=2)
def segment(image, threshold=128):
dark = image < threshold
bright = image > threshold
return dark, bright


# 在集群中异步执行一个 task
# 会立刻返回一个 future,它实际上就是一个 ObjectID,唯一标识这个 task
future = imread.remote('/data/python.png')


# 返回这个 task 的执行结果,block 直到结果返回
arr = ray.get(future)

Actor

有状态的计算,根据调用有顺序地进行计算,无需加锁(这个也是 Actor 模型的特点)

# 通过 Python class 注册一个 Actor
@ray.remote
class ParameterServer(object):
def __init__(self, keys, values):
values = [value.copy() for value in values] self.weights = dict(zip(keys, values))


def push(self, keys, values):
for key, value in zip(keys, values):
self.weights[key] += value


def pull(self, keys):
return [self.weights[key] for key in keys]


# 初始化这个 actor
ps = ParameterServer.remote(keys, initial_values)


# 执行这个 actor 的相关方法
# actor 方法调用是顺序执行的,这个模型非常简单,不会有加锁相应的操作
future0 = ps.push.remote(keys, grads0)
future1 = ps.push.remote(keys, grads1)
future2 = ps.grab.pull(keys)

在实际的应用中,Actor 与 Task 它们是可以组合在一起使用的,作业内的这种组合也就构成了这个作业的 Graph,只不过在 Ray 中,这个 Graph 并不是固定的。

Remote Task exectute

下面有一个示例图(图片来自 Ray: A Distributed Framework for Emerging AI Applications[3]),介绍了一个 Task 是如何调度运行(a 存储在 Node N1 上,b 存储在 Node N2 上,这个执行逻辑跟 Actor 其实没有本质区别,都是类似的):

其执行步骤如下(在图中的第 0 步中,它会先将 add
 函数注册到 GCS 中,个人理解这个应该不会将 add
 提前广播到所有结果,应该调度到哪个节点,该节点再从 GCS 获取):

1.执行 `id = add.remote(a,b)`` 函数,先往 Local Scheduler 进行调度;2.Local Scheduler 发现本地只有 a
,没有 b
,向 Global Scheduler 求助;
3.Global Scheduler 查询 GCS 中的 Object Table 获取 b
 的位置;
4.Global Scheduler 决定将这个 task 调度到 b
 所在节点 N2;
5.N2 的 Local Scheduler 发现本地没有 a
6.N2 向 GCS 中的 Object Table 查询 a
 的位置;
7.N2 从 a
 的 Object Store 中拷贝到本地的 Object Store;
8.执行函数;

而调用 get
 结果获取执行结果的流程如下图(图片来自 Ray: A Distributed Framework for Emerging AI Applications[3]):

其执行步骤如下:

1.使用 c
 的 feature 请求 local object store;
2.由于 N1 的 local object store 没有 c
 的缓存,它会向 GCS 求助,但是此时 c
 的计算还没完成,因此 N1 的 Object store 会在 GCS 的 Object Table 中注册一个 callback
,当 c
 的结果完成后会通知 N1 的 Object Store;
3.此时,N2 完成计算逻辑,它会把 c
 存储到 N2 的 Local Object Store 中;
4.N2 的 Local Object Store 在存储 c
 时,也会把结果同步到 GCS,告诉 GCS c
 的结果现在存储在 N2 节点中;
5.GCS 触发 N1 Object Store 存储的 callback
6.N1 从 N2 节点将 c
 拷贝过来;
7.返回结果;

容错机制

它的容错是基于 GCS 存储的状态来做的,可以分为以下两种情况:

1.Non Actors:根据血缘关系重新构造上下游关系,直接恢复即可;2.Actors:就需要依赖 Checkpoint 及数据重放来恢复了。

调度实现

Ray 的一个目标是实现每秒百万级任务调度,为此设计了两级调度器,包括全局调度器和每个节点上的本地调度器(当前叫做 raylet)。为了降低全部调度器的负载,节点 (worker,actor) 上派生的任务首先提交给 LocalScheduler,如果本地的资源无法满足,则会把待调度的任务提交给 GlobalScheduler(图片来自:Ray: A Distributed Framework for Emerging AI Applications[3]):

GlobalScheduler 会根据每个节点的负载和任务的需求进行调度。决策依据有:

1.每个 Node 上任务队列的大小;2.Node 上任务的排队时间;3.任务需要的数据传输到该节点所需的时间;

当 GlobalScheduler 出现瓶颈时,Ray 会实例化更多的 GlobalScheduler 来分担工作,这得益于 GCS,使得 GlobalScheduler 可扩展。

这里简单介绍了一下 Ray 的内部实现机制,因此没有去研究源码,上面的总结主要是从论文和官网获取,可能跟现在的代码实现有一些差异,但是核心设计的思想不会有太多变化。

References

[1]
 Paper: Real-Time Machine Learning: The Missing Pieces: https://arxiv.org/pdf/1703.03924.pdf
[2]
 Distributed Ray Overview: https://docs.ray.io/en/latest/cluster/index.html?highlight=Raylet
[3]
 Ray: A Distributed Framework for Emerging AI Applications: https://www.usenix.org/system/files/osdi18-moritz.pdf


文章转载自柳年思水,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论