暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka复制协议V1

数据库技术汇 2021-04-25
1369


本文描述Kafka复制协议中涉及的关键数据结构和算法。

1. 存储在Zookeeper中Path(元数据)

说明: 

如果一个路径中包含了一个类似[xyz]的标识,表示xyz的数值不是固定的。例如,/topics/[topic]表示一个名称为/topics的目录,包含了以topic名称命名的多个子目录。一个“->”符号表示一个znode的内容,例如,/hello -> world表示名称为/hello的znode,包含的数值为“hello”。除非标记为ephemeral节点,否则一个节点是永久节点。

 

Zookeeper中存储了以下的Path:

  • /brokers/ids/[broker_id] -->host:port(ephemeral,由admin创建)
    存储活跃brokers的信息.

  • /brokers/topics/[topic]/[partition_id]/replicas--> {broker_id ...}(由admin创建)

    每一个partition分开存储,存储分配给这个partition的当前replicas。每一个replica我们存储其对应的broker id。第一个replica是首选的replica,注意:对于一个特定的partition,在一个broker上最多只有一个replica,因此我们可以使用broker id作为replica id。

  • /brokers/topics/[topic]/[partition_id]/leader--> broker_id (ephemeral) (由leader创建)

    存储当前partition的leader replica id。

  • /brokers/topics/[topic]/[partition_id]/ISR--> {broker_id, ...} (由leader创建)

    存储当前partition的ISR副本的replicas id(可能有多个replica和leader处于同步状态)

  • /brokers/partitions_reassigned/[topic]/[partition_id]--> {broker_id ...} (由admin创建)

    这个path用于我们需要将某些partition分配给其他broker集合。对于每一个需要重新分配的partition,存储一个新replicas的列表以及其对应的brokers。这个path由admin进程创建,并且在成功执行完分配操作后删除。

2. 关键数据结构


每一个broker存储一个与其对应的partitions列表,以及分配给它的replicas。partition的当前leader进一步维护3个集合:AR,ISR,CUR和RAR,分别对应为:分配给这个partition的replicas,和leader保持同步的replica集合,尚未同步正在同步的replica集合,以及正在重新分配到其他broker的replica集合。通常,ISR ⊆ AR,AR = ISR + CUR。当前partition的leader维护一个commitQ,使用这个Queue缓冲所有client发送过来准备提交的消息。broker针对分配给它的每一个replica,周期性地将replica的HW存储到一个checkpoint文件。



3. 关键算法


Zookeeper节点监听:

  • Leader-change监听:
    监听/brokers/topics/[topic]/[partition_id]/leader目录的变化

  • Replica-change监听:
    /brokers/topics目录子节点的变化 (新topic注册)/brokers/topics/[topic]目录子节点的变化 (新partition注册)/brokers/topics/[topic]/[partition_id]/replicas目录的变化 (分配新的replica)

  • Partition-reassigned监听:
    /brokers/partitions_reassigned目录子节点的变化/brokers/partitions_reassigned/[topic]目录子节点的变化


配置参数:

  • LeaderElectionWaitTime:
    设置我们等待选主过程完成的最大时间。

  • KeepInSyncTime:
    设置Leader将一个follower从ISR中移除的最大时间(超过这个时间将从ISR中移除)。


Broker启动:

当一个broker启动的时候,它调用 brokerStartup()函数,并且执行Figure 1和Figure 2中描述的算法。

其中很关键的环节是在replicaStateChange中对High Watermark的处理:

r.hw = min(last checked HW for r, r.leo)

也就是将这个replica的HW设置为已经和Leader确认的HW和本地LEO的最小值。这就是后面不断更新复制协议来补可能丢失已经Commit消息的源头之一。

truncate r's log to r.hw

在becomeFollower中,Follower将本地消息truncate到本地HW,注意这个HW可能是在上面replicaStateChange过程中修复产生的。


第一版的复制协议,描述得很模糊,从LeaderElection函数中可以看到,算法只说明了ISR不为空的情况,而其实这正好是一个集群宕机后恢复最简单的场景。后来的unclean.leader.election.enable参数就是为了简化这个环节的处理过程,这个在后续我们再说。

becomeLeader(r: Replica, ISR: Set[Replica])

这是很关键的过程,等待其他“活跃”的replicas和目标replica同步,完成可能的同步后,计算出ISR、CUR、RAR并记录到Zookeeper中去。


Client
发送生产请求:

当Broker接收到一个生产请求,它调用produce()函数,并执行Figure3和Figure4中描述的算法。

produce函数比较简单,仅仅是把消息放到CommitQ中,由后台专门的committer线程来负责提交数据

首先尝试将数据同步给ISR集合中的replica,如果ISR集合中的某个replica在配置的时间内没有完成和Leader的同步,将被从ISR集合中移除。相反,如果一个在CUR中的replica,同步进度已经追赶上了Leader,将被加入到ISR集合中。判断是否同步的标准是:

r.leo >= pr.offset

最终ack给produce client也是在committer线程中完成的。

如果有被重新分配的replica,检查RAR中是否存在replica,其数据已经完成和Leader同步,尝试切换到新分配的replica上去。checkLoadBalancing函数处理是否应该将Leader切换到最合适的replica上去。判断的条件是当前Leader不是最优选择,且最优候选在ISR集合中。


Follower从leader获取数据:

Follower不断向Leader发送ReplicaFetcherRequests请求。 Figure 5中分别描述了Leader端和Follower端的处理流程。

Leader的处理流程是:

当收到ReplicaFetchRequest后,更新对应replica的LEO,从leader的log中读取请求Offset对应的消息,同时在response中附带leader的HW。


Follower的处理流程是:

  1. 循环执行下面的步骤:

  2. 发送ReplicaFetchRequest

  3. 接收Leader的response

  4. 将消息append到本地Log

  5. 使用response中的leader HW更新本地HW

  6. 更新本地offset


On leader-change event:

Broker会为分配给它的每一个replica注册一个到ZK的监听,监听leader-change的事件。当leader-change事件触发的时候,broker调用Figure 6中描述的onLeaderChange()的函数。

大致流程为:

  • 如果onLeaderChange选出了一个新Leader,那么判断本地replica是否就是这个新Leader自己(所有的Replica都会触发这个回调,包含新Leader自己)。如果当前replica不是新Leader,则调用becomeFollower函数。

  • 如果onLeaderChange告知当前replica原来的Leader消失了,在当前节点尚未被删除之前(有可能当前replica尤其其他原因已经不再是这个partition的一个replica了),触发leaderElection操作。


On replica-change event:

当replica-change事件触发的时候,broker调用Figure 7中描述的onReplicaChange()函数。 

比较简单分为:

  • 一个replica被加入到当前broker

  • 一个replica从当前broker移除


两种情况分别处理。


On partition-reassigned event:

当partition-reassigned事件触发的时候, broker调用Figure 8中描述的onPartitionReassigned()函数。

4. 管理操作流程

新增一个topic或者新增一个partition:

  • 在ZK的/brokers/topics/[topic]/[partition_id]/replicas目录下新增一个或者多个子目录.

  • 一个或者多个replica-change事件会被触发,listener开启每一个replica的初始化引导流程。

 向集群中加入新节点:

  • 管理流程决定哪些partition需要重新分配给新的broker,在ZK的/brokers/partitions_reassigned/[topic]/[partition_id]目录下,新增一个或者多个子目录。

  • 每一个broker都会收到一个partition-reassigned事件。如果一个replica被分配给了一个broker,broker会启动初始化引导流程去创建一个replica。这个partition的当前leader会将这个新replica加入到这个partition的RAR集合中,同时监控它是否和leader同步。一旦这个replica同步完成,leader将这个新的replica更新到ZK中,然后触发一次leaderelection。

  • Leader election完成后,其中一个新replica会变成leader,同时其他的replicas会变成follower。在触发相应的replica-change事件后,旧的replica会被删除掉。

文章转载自数据库技术汇,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论