暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Aeron是如何实现的?—— Ipc Publication

BUG侦探 2021-11-24
584

接上文
《Aeron是什么?》
《Aeron中这么多空闲策略选哪个?
《Aeron是如何实现的?—— Conductor

0. 简介

最近我们用 Aeron 实现了 Mesh agent 与 sdk 之间的共享内存通信,但是在使用过程中越来越感觉到 Aeron 框架太重了,其中很大部分功能完全用不到,有些想要自定义的逻辑很难在现有框架中实现。
所以我们计划深入到 Aeron 源码中,看看它是如何实现的,最终尝试实现一个轻量的 Mesh 共享内存通信类库。
上文分析了 Conductor 的逻辑,本文继续分析 Ipc Publication 的逻辑。

1. Driver Conductor - add[Exclusive]Publication

在发送数据之前,需要先向 Driver Conductor 发送ADD_[EXCLUSIVE_]PUBLICATION命令,让 Driver 初始化通信的共享内存结构。
至于 Conductor 交互的逻辑不再赘述,直接看 Driver Conductor 处理 ADD_[EXCLUSIVE_]PUBLICATION命令的逻辑。处理逻辑的入口在 io.aeron.driver.ClientCommandAdapter
这里只关心IPC_CHANNEL,进入io.aeron.driver.DriverConductor
三个主要步骤:
  1. 创建IpcPublication,核心就是 "${aeronDirectory}/${correlationId}.logbuffer" 共享内存文件;

  2. 返回ON_[EXCLUSIVE_]PUBLICATION_READY消息;

  3. 向所有相关的 subscribers 发送ON_AVAILABLE_IMAGE消息。(subscribers 收到该消息就会读取共享内存进行消费,具体逻辑下篇分析)

1.1 ${correlationId}.logbuffer 共享内存文件

logbuffer 文件结构的定义在io.aeron.logbuffer.LogBufferDescriptor中:
上面三个 Term 用于传输数据,这三个 Term 轮转使用,Aeron Cookbook 上有个动画演示,看起来这种设计主要有利于 UDP 数据流重建。
下面的 Meta Data 比较复杂,具体字段用到的时候再详细看:

2. Client Conductor - onNew[Exclusive]Publication

client 收到ON_[EXCLUSIVE_]PUBLICATION_READY消息后构建相应的[Exclusive|Concurrent]Publication,该类封装了发送消息的逻辑。
其中ConcurrentPublication支持并发发送数据,但是性能不如ExclusivePublication 好。
看一下父类io.aeron.Publication的构造方法:
再看一下io.aeron.ExclusivePublication的构造方法(ConcurrentPublication 的主干逻辑类似,这里就不详细看了):
大部分参数比较直观,termAppenders封装了具体 Term 的写入逻辑。
上面的提到的 Term 轮转用法具体到代码中,就是activePartitionIndextermBeginPosition这两个变量的维护。
首先看一下选哪个 Term:
  1. 从 Log Meta Data 中取出 Active Term Count (termCount)

  2. termCount按照PARTITION_COUNT(3)取余得到当前的 Term 索引 (index)

然后看一下从哪个位置开始写:
  1. 从 Log Meta Data 中获取对应的的 Tail Counter #index (rawTail)

  2. rawTail的高 32 位是 termId,低 32 位是termOffset

  3. termBeginPosition = (termId - initialTermId) << positionBitsToShift,其中positionBitsToShift这步操作本质上就是乘 Term 的长度

最后需要解释一下positionLimit,这个值读取的是 cnc.dat 中的一个 Counters Buffer,用于表示可以写入的位置限制,主要作用是传递 Subscriber 的消费能力,用于背压。(这个值的更新在 Driver 中,下篇分析 Subscription 时再详细看)
这个信息可以通过io.aeron.samples.AeronStat工具查看。其 label 是 "pub-lmt: ${registrationId} ${sessionId} ${streamId} ${channel}",其中的 registrationId 就是 logbuffer 文件名的 correlationId。

3. ExclusivePublication - offer

发送数据有两个方法offertryClaim
看一下ExclusivePublicationoffer方法(ConcurrentPublication的主干逻辑与之类似,只是多了些并发控制):
首先如果写入的位置 position 大于等于limit,也就是消费的能力更不上写入了,那么产生背压(ConcurrentPublication由于存在并发的情况,所以并不是严格限制,但是不会超过该 term):
这里还有个细节maxPossiblePosition,主要是限制 termCount 这个 int 值不要溢出。

3.1 termAppender.append[Un]fragmentedMessage 写入数据

对于正常可以写入的情况,如果写入的数据小于 MTU,那么调用 appendUnfragmentedMessage
首先更新 Tail Counter #index。
如果写入的长度大于本 term 剩余可写空间了,那么在handleEndOfLogCondition方法中处理异常情况:
剩余空间填充一个PADDING_FRAME_TYPE的消息,然后返回FAILED(-1)
如果本 term 剩余可写空间足够,那么依次写入 header、reservedValue 和数据。
关于长度字段的写入,有个小细节很有意思,先写入一个负值,最后再写入一个正值。通过这种方式可以保证 subscriber 读到“正值”时,数据已经全部写入了。
对于数据大于 MTU 的场景,就需要对数据分段了,其它的逻辑跟上面是一样的,只是通过BEGIN_FRAG_FLAGEND_FRAG_FLAG标识了消息。

3.2 newPosition 更新位置

数据发送成功后,通过newPosition方法更新位置信息:
如果写入成功了,那么直接更新本地变量termOffset即可。
如果写入失败了,又不是超过最大位置的场景,那么就需要轮转 term 了。
更新 Log Meta Data 中的 Active Term Count 和 Tail Counter #next,以及相关的本地变量。

4. tryClaim

最后看一下tryClaim方法,主干逻辑与offer一致。
区别在于:offer 时,数据已经准备好,所以在处理逻辑中直接写入;tryClaim 时,只是预先占着位置,上层通过BufferClaim写入数据,最后commit时,提交“正值”的长度字段。



🔍
作者介绍
🕵️‍♀️
ABOUT US
有 趣 灵 魂 集 结 地
思 路 总 结 大 本 营

SCAN AND FOLLOW US NOW
@BUG侦探


文章转载自BUG侦探,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论