暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Aeron是如何实现的?—— Conductor

BUG侦探 2021-11-10
885


接上文
《Aeron是什么?》
《Aeron中这么多空闲策略选哪个?

0. 简介

最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。
所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。
再回过头来看一下这张架构图:
Media Driver 组件负责管理数据通过媒介(UDP 或 IPC)的发送与接收。其中的 Sender 和 Receiver 负责数据的发送与接收,Driver Conductor 接收 Aeron Client 的命令,协调 Media Driver 的动作。
本文将分析一下 Driver Conductor 的实现,以及与 Client Conductor 的交互逻辑。

1. Driver Conductor 主干逻辑

Driver Conductor 的逻辑封装在 io.aeron.driver.DriverConductor 类中,该类是一个 org.agrona.concurrent.Agent,所以直接看一下 doWork() 方法:
  1. processTimers(nowNs) 处理心跳和超时

  2. clientCommandAdapter.receive() 处理来自 Client Conductor 的命令(也就是上图中的红线)

  3. driverCmdQueue.drain(...) 处理来自 Sender 和 Receiver 的命令(也就是上图中的蓝线)

  4. 维护 publisher 的 limit 位置

  5. name resolver

本文主要分析第 2 部分,其它暂且不看。

2. Client Conductor --> Driver Conductor

Driver Conductor 处理命令的逻辑封装在 io.aeron.driver.ClientCommandAdapter 中。业务逻辑在 onMessage 方法中,也就是针对不同 msgTypeId 调用对应的处理函数。
命令传输的逻辑封装在 toDriverCommands 中,他是一个 RingBuffer,其初始化逻辑在 io.aeron.driver.MediaDriver.ContextconcludeDependantProperties() 方法中:
可见就是将 cncByteBuffer 的一部分包装为ManyToOneRingBuffer,Client Conductor 写入命令,Driver Conductor 处理命令。

2.1 cncByteBuffer 是什么?

cncByteBuffer 是 ${aeronDirectory}/cnc.dat 文件 mmap 映射的内存,而${aeronDirectory} 默认是在/dev/shm/下,也就是说这是一块共享内存。
具体看一下初始化逻辑,位于io.aeron.driver.MediaDriver.Contextconclude() 方法中:
mmap 映射的逻辑在 org.agrona.IoUtilmapNewFile 方法中:
暂且不再深究操作系统细节,回到 cnc.dat 文件,其格式定义在 io.aeron.CncFileDescriptor 中:
toDriverCommands也就是基于 ”to-driver Buffer“ 包装的ManyToOneRingBuffer。Client Conductor 通过 cnc.dat 中的这部分共享内存向 Driver Conductor 发命令。

2.2 ManyToOneRingBuffer 是个啥?

顾名思义就是个环形缓冲区,状态信息存储在 buffer 末尾:
  1. tailPosition 维护尾部的位置信息

  2. headPosition 和 headCachePosition 维护头部的位置信息

  3. correlationIdCounter 用于生成唯一的命令 ID,用于关联命令响应

  4. consumerHeartbeat 消费端的心跳,也就是消费时更新个时间戳(更新的逻辑就位于上面提到的processTimers(nowNs) 方法中)

2.3 Client Conductor 如何发送命令?

Client Conductor 向 Driver Conductor ”建连接“ 的逻辑位于io.aeron.Aeron.ContextconnectToDriver中:
此处用”建连接“这个词并不准确,该步骤的主要逻辑是判断driverTimeoutMs时间(默认10s)内 Driver 是否准备就绪。
判断标准有三个,在超时时间内:
  1. cnc.dat 文件可以映射

  2. cncVersion 信息不为空,也就是已经初始了

  3. to-driver Buffer 有心跳,也就是 Driver 的消费逻辑正常启动了

发送命令的逻辑比较清晰,以ADD_PUBLICATION为例:

3. Driver Conductor --> Client Conductor

上面是 Client Conductor 向 Driver Conductor 发送命令的逻辑,接下来看一下 Driver Conductor 向 Client Conductor 回复响应的逻辑。
实际上实现方式是一样的,只场景稍微有些区别,所以用到的工具类不一样。
Driver Conductor 向 Client Conductor 回复响应用的是 cnc.dat 中的 to-clients Buffer,具体封装在io.aeron.driver.ClientProxy 中,初始化逻辑为:
由于存在多个 Aeron Client 连接一个 Media Driver,所以回复响应使用的是一个广播的工具类org.agrona.concurrent.broadcast.BroadcastTransmitter,Client 根据 id 匹配需要的响应消息。
Client Conductor 对应的消费逻辑封装在io.aeron.DriverEventsAdapter中,消费用到的广播工具类为org.agrona.concurrent.broadcast.BroadcastReceiver
这一对广播工具类的细节就不展开了。




🔍
作者介绍
🕵️‍♀️
ABOUT US
有 趣 灵 魂 集 结 地
思 路 总 结 大 本 营

SCAN AND FOLLOW US NOW
@BUG侦探


文章转载自BUG侦探,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论