本文描述Kafka复制协议中涉及的关键数据结构和算法。
1. 存储在Zookeeper中Path(元数据)
说明:
如果一个路径中包含了一个类似[xyz]的标识,表示xyz的数值不是固定的。例如,/topics/[topic]表示一个名称为/topics的目录,包含了以topic名称命名的多个子目录。一个“->”符号表示一个znode的内容,例如,/hello -> world表示名称为/hello的znode,包含的数值为“hello”。除非标记为ephemeral节点,否则一个节点是永久节点。
Zookeeper中存储了以下的Path:
/brokers/ids/[broker_id] -->host:port(ephemeral,由admin创建)
存储活跃brokers的信息./brokers/topics/[topic]/[partition_id]/replicas--> {broker_id ...}(由admin创建)
每一个partition分开存储,存储分配给这个partition的当前replicas。每一个replica我们存储其对应的broker id。第一个replica是首选的replica,注意:对于一个特定的partition,在一个broker上最多只有一个replica,因此我们可以使用broker id作为replica id。/brokers/topics/[topic]/[partition_id]/leader--> broker_id (ephemeral) (由leader创建)
存储当前partition的leader replica id。/brokers/topics/[topic]/[partition_id]/ISR--> {broker_id, ...} (由leader创建)
存储当前partition的ISR副本的replicas id(可能有多个replica和leader处于同步状态)/brokers/partitions_reassigned/[topic]/[partition_id]--> {broker_id ...} (由admin创建)
这个path用于我们需要将某些partition分配给其他broker集合。对于每一个需要重新分配的partition,存储一个新replicas的列表以及其对应的brokers。这个path由admin进程创建,并且在成功执行完分配操作后删除。
2. 关键数据结构
每一个broker存储一个与其对应的partitions列表,以及分配给它的replicas。partition的当前leader进一步维护3个集合:AR,ISR,CUR和RAR,分别对应为:分配给这个partition的replicas,和leader保持同步的replica集合,尚未同步正在同步的replica集合,以及正在重新分配到其他broker的replica集合。通常,ISR ⊆ AR,AR = ISR + CUR。当前partition的leader维护一个commitQ,使用这个Queue缓冲所有client发送过来准备提交的消息。broker针对分配给它的每一个replica,周期性地将replica的HW存储到一个checkpoint文件。

3. 关键算法
Zookeeper节点监听:
Leader-change监听:
监听/brokers/topics/[topic]/[partition_id]/leader目录的变化Replica-change监听:
/brokers/topics目录子节点的变化 (新topic注册)/brokers/topics/[topic]目录子节点的变化 (新partition注册)/brokers/topics/[topic]/[partition_id]/replicas目录的变化 (分配新的replica)Partition-reassigned监听:
/brokers/partitions_reassigned目录子节点的变化/brokers/partitions_reassigned/[topic]目录子节点的变化
配置参数:
LeaderElectionWaitTime:
设置我们等待选主过程完成的最大时间。KeepInSyncTime:
设置Leader将一个follower从ISR中移除的最大时间(超过这个时间将从ISR中移除)。
Broker启动:
当一个broker启动的时候,它调用 brokerStartup()函数,并且执行Figure 1和Figure 2中描述的算法。

其中很关键的环节是在replicaStateChange中对High Watermark的处理:
r.hw = min(last checked HW for r, r.leo)
也就是将这个replica的HW设置为已经和Leader确认的HW和本地LEO的最小值。这就是后面不断更新复制协议来补可能丢失已经Commit消息的源头之一。
truncate r's log to r.hw
在becomeFollower中,Follower将本地消息truncate到本地HW,注意这个HW可能是在上面replicaStateChange过程中修复产生的。

第一版的复制协议,描述得很模糊,从LeaderElection函数中可以看到,算法只说明了ISR不为空的情况,而其实这正好是一个集群宕机后恢复最简单的场景。后来的unclean.leader.election.enable参数就是为了简化这个环节的处理过程,这个在后续我们再说。
becomeLeader(r: Replica, ISR: Set[Replica])
这是很关键的过程,等待其他“活跃”的replicas和目标replica同步,完成可能的同步后,计算出ISR、CUR、RAR并记录到Zookeeper中去。
Client发送生产请求:
当Broker接收到一个生产请求,它调用produce()函数,并执行Figure3和Figure4中描述的算法。

produce函数比较简单,仅仅是把消息放到CommitQ中,由后台专门的committer线程来负责提交数据。
首先尝试将数据同步给ISR集合中的replica,如果ISR集合中的某个replica在配置的时间内没有完成和Leader的同步,将被从ISR集合中移除。相反,如果一个在CUR中的replica,同步进度已经追赶上了Leader,将被加入到ISR集合中。判断是否同步的标准是:
r.leo >= pr.offset
最终ack给produce client也是在committer线程中完成的。

如果有被重新分配的replica,检查RAR中是否存在replica,其数据已经完成和Leader同步,尝试切换到新分配的replica上去。checkLoadBalancing函数处理是否应该将Leader切换到最合适的replica上去。判断的条件是当前Leader不是最优选择,且最优候选在ISR集合中。
Follower从leader获取数据:
Follower不断向Leader发送ReplicaFetcherRequests请求。 Figure 5中分别描述了Leader端和Follower端的处理流程。

Leader的处理流程是:
当收到ReplicaFetchRequest后,更新对应replica的LEO,从leader的log中读取请求Offset对应的消息,同时在response中附带leader的HW。
Follower的处理流程是:
循环执行下面的步骤:
发送ReplicaFetchRequest
接收Leader的response
将消息append到本地Log
使用response中的leader HW更新本地HW
更新本地offset
On leader-change event:
Broker会为分配给它的每一个replica注册一个到ZK的监听,监听leader-change的事件。当leader-change事件触发的时候,broker调用Figure 6中描述的onLeaderChange()的函数。

大致流程为:
如果onLeaderChange选出了一个新Leader,那么判断本地replica是否就是这个新Leader自己(所有的Replica都会触发这个回调,包含新Leader自己)。如果当前replica不是新Leader,则调用becomeFollower函数。
如果onLeaderChange告知当前replica原来的Leader消失了,在当前节点尚未被删除之前(有可能当前replica尤其其他原因已经不再是这个partition的一个replica了),触发leaderElection操作。
On replica-change event:
当replica-change事件触发的时候,broker调用Figure 7中描述的onReplicaChange()函数。

比较简单分为:
一个replica被加入到当前broker
一个replica从当前broker移除
两种情况分别处理。
On partition-reassigned event:
当partition-reassigned事件触发的时候, broker调用Figure 8中描述的onPartitionReassigned()函数。

4. 管理操作流程
新增一个topic或者新增一个partition:
在ZK的/brokers/topics/[topic]/[partition_id]/replicas目录下新增一个或者多个子目录.
一个或者多个replica-change事件会被触发,listener开启每一个replica的初始化引导流程。
向集群中加入新节点:
管理流程决定哪些partition需要重新分配给新的broker,在ZK的/brokers/partitions_reassigned/[topic]/[partition_id]目录下,新增一个或者多个子目录。
每一个broker都会收到一个partition-reassigned事件。如果一个replica被分配给了一个broker,broker会启动初始化引导流程去创建一个replica。这个partition的当前leader会将这个新replica加入到这个partition的RAR集合中,同时监控它是否和leader同步。一旦这个replica同步完成,leader将这个新的replica更新到ZK中,然后触发一次leaderelection。
Leader election完成后,其中一个新replica会变成leader,同时其他的replicas会变成follower。在触发相应的replica-change事件后,旧的replica会被删除掉。




