假设某主题只有1个分区,该分区有两个副本:Leader 副本在 Broker1 上,Follower 副本在 Broker2 上,其 Leader 副本写入数据和 Follower 副本同步数据的流程如下图:

1.leader 副本将数据写入本地磁盘KafkaApis.handleProduceRequest(){replicaManager.appendRecords(){appendToLocalLog(){Partition.appendRecordsToLeader(){Log.appendAsLeader(){Log.append(){//通过LogSegment.append()方法写入磁盘LogSegment.append()}}}}}}2.leader 副本更新LEOKafkaApis.handleProduceRequest(){replicaManager.appendRecords(){appendToLocalLog(){Partition.appendRecordsToLeader(){Log.appendAsLeader(){Log.append(){//更新Leader副本的LEO值updateLogEndOffset(appendInfo.lastOffset + 1)}}}}}}3.follower 副本同步数据,携带自身的LEOAbstractFetchThread.doWork(){maybeFetch(){buildFetch(fetchStates){//这里的fetchState.fetchOffset 就是Follower副本的LEO值builder.add(topicPartition, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))}}}4.leader 副本更新本地保存的Follower副本的LEOReplicaManager.fetchMessages(){//获取读取结果val logReadResults = readFromLog(){if (isFromFollower) updateFollowerLogReadResults(replicaId, result){//TODO 更新leader保存的各个follower副本的LEOpartition.updateReplicaLogReadResult(replica, readResult){//TODO 最终更新所有的replica的LEO的值replica.updateLogReadResult(logReadResult){//更新LEO对象logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata}}}}}5.leader 副本尝试更新ISR列表ReplicaManager.fetchMessages(){//获取读取结果val logReadResults = readFromLog(){if (isFromFollower) updateFollowerLogReadResults(replicaId, result){//TODO 尝试更新ISR列表val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult){//更新ISR列表updateIsr(newInSyncReplicas)}}}}6.leader 副本更新HWReplicaManager.fetchMessages(){//获取读取结果val logReadResults = readFromLog(){if (isFromFollower) updateFollowerLogReadResults(replicaId, result){//TODO 尝试更新ISR列表及Leader副本的HWval leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult){//TODO 尝试更新leader的HWmaybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs){//取ISR列表中副本的最小的LEO作为新的HWval newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)//获取旧的HWval oldHighWatermark = leaderReplica.highWatermark//如果新的HW值大于旧的HW值,就更新if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||(oldHighWatermark.messageOffset == newHighWatermark.messageOffset &&oldHighWatermark.onOlderSegment(newHighWatermark))) {//更新 Leader 副本的 HWleaderReplica.highWatermark = newHighWatermark}}}}}}7.leader 副本给 follower副本 返回数据,携带leader 副本的 HW 值ReplicaManager.fetchMessages(){//获取读取结果val logReadResults = readFromLog(){readFromLocalLog(){read(){val readInfo = partition.readRecords(){//获取Leader Replica的高水位val initialHighWatermark = localReplica.highWatermark.messageOffset}}}}}8.follower 副本写入数据,更新自身LEO、ReplicaFetcherThread.processPartitionData(){partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false){doAppendRecordsToFollowerOrFutureReplica(){Log.appendAsFollower(){Log.append(){//更新Follower副本的LEO值updateLogEndOffset(appendInfo.lastOffset + 1)}}}}}9.follower 副本更新本地的 HW 值ReplicaFetcherThread.processPartitionData(){//根据leader返回的HW,更新Follower本地的HW:取Follower本地LEO 和 Leader HW 的较小值val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)//TODO 更新Follower副本的 HW 对象replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)}
对于HW,Leader 副本和 Follower 副本只保存自身的
对于LEO,Follower 副本只保存自身的,但是 Leader 副本除了保存自身的外,还会保存所有 Follower 副本的 LEO 值
无论是Leader副本所在节点,还是Follower副本所在节点,分区对应的Partition 对象都会保存所有的副本对象,但是只有本地副本对象有对应的日志文件
整个数据写入及同步的过程分为九个步骤:
leader 副本将数据写入本地磁盘 leader 副本更新 LEO follower 副本发送同步数据请求,携带自身的 LEO leader 副本更新本地保存的其它副本的 LEO leader 副本尝试更新 ISR 列表 leader 副本更新 HW leader 副本给 follower 副本返回数据,携带 leader 副本的 HW 值 follower 副本接收响应并写入数据,更新自身 LEO follower 副本更新本地的 HW 值
下面具体分析这几个步骤。第一、二步在分析日志对象的写数据流程时已经详细介绍过,这里不再赘述(《深入理解Kafka服务端之日志对象的读写数据流程》)。 对于后面的几个步骤,由于发生在不同的节点上,并没有按照这个顺序进行分析,而是分成了
Follower副本的相关操作:即 第三步、第八步、第九步 Leader副本的相关操作:即 第四步、第五步、第六步、第七步
抽象类:AbstractFetcherThread 实现类:ReplicaFetcherThread
abstract class AbstractFetcherThread(name: String,//线程名称clientId: String,//Cliend ID,用于日志输出val sourceBroker: BrokerEndPoint,//数据源Broker地址failedPartitions: FailedPartitions,//线程处理过程报错的分区集合fetchBackOffMs: Int = 0,//拉取的重试间隔,默认是 Broker 端参数 replica.fetch.backoff.ms 值。isInterruptible: Boolean = true)//是否允许线程中断extends ShutdownableThread(name, isInterruptible) {type FetchData = FetchResponse.PartitionData[Records]type EpochData = OffsetsForLeaderEpochRequest.PartitionData//泛型 PartitionFetchState:表征分区读取状态,包含已读取偏移量和对应的副本读取状态//副本状态由 ReplicaState 接口定义,包含 读取中 和 截断中 两个private val partitionStates = new PartitionStates[PartitionFetchState]...}
其中,type 的用法是:给指定的类起一个别名,如:
type FetchData = FetchResponse.PartitionData[Records]
后面就可以用 FetchData 来表示 FetchResponse.PartitionData[Records] 类;EpochData 同理。
FetchResponse.PartitionData:FetchResponse是封装的FETCH请求的响应类,PartitionData是一个嵌套类,表示响应中单个分区的拉取信息,包括对应Leader副本的高水位,分区日志的起始偏移量,拉取到的消息集合等。
public static final class PartitionData<T extends BaseRecords> {public final Errors error;//错误码public final long highWatermark;//从Leader返回的分区的高水位值public final long lastStableOffset;// 最新LSO值public final long logStartOffset;//日志起始偏移量public final Optional<Integer> preferredReadReplica;// 期望的Read Replica;KAFKA 2.4之后支持部分Follower副本可以对外提供读服务public final List<AbortedTransaction> abortedTransactions;// 该分区对应的已终止事务列表public final T records;//消息集合}
public static class PartitionData {public final Optional<Integer> currentLeaderEpoch;public final int leaderEpoch;}
PartitionFetchState:样例类,用来表征分区的读取状态。包含已拉取的偏移量,当前leader的epoch,副本读取状态等
case class PartitionFetchState(fetchOffset: Long,//已拉取的偏移量currentLeaderEpoch: Int,//当前epochdelay: DelayedItem,state: ReplicaState//副本读取状态) {//表征分区的读取状态//1.可拉取,表明副本获取线程当前能够读取数据。判断条件是:副本处于Fetching且未被推迟执行def isReadyForFetch: Boolean = state == Fetching && !isDelayed//2.截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是:副本处于Truncating且未被推迟执行def isTruncating: Boolean = state == Truncating && !isDelayed//3.被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。判断条件是:存在未过期的延迟任务def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0}
isReadyForFetch:可拉取,表明副本获取线程当前能够读取数据。判断条件是:副本处于Fetching且未被推迟执行 isTruncating:截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。判断条件是:副本处于Truncating且未被推迟执行 isDelayed:被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。判断条件是:存在未过期的延迟任务
副本读取的状态
ReplicaState:特质,用来表征副本读取状态。
sealed trait ReplicaState//截断中case object Truncating extends ReplicaState//拉取中case object Fetching extends ReplicaState
Truncating:截断中 Fetching:拉取中
buildFetch:封装拉取数据的请求 truncate:进行日志截断 processPartitionData:处理返回的响应 doWork:将上面定义的三个方法串联起来,形成一个闭环,并不断地重复执行。从而实现从Leader副本所在的节点同步消息
class ReplicaFetcherThread(name: String,fetcherId: Int,//Follower 拉取的线程 Id,也就是线程的编号。// 单台 Broker 上,允许存在多个 ReplicaFetcherThread 线程。// Broker 端参数 num.replica.fetchers,决定了 Kafka 到底创建多少个 Follower 拉取线程。sourceBroker: BrokerEndPoint,brokerConfig: KafkaConfig,//服务端配置类,用来获取配置信息failedPartitions: FailedPartitions,replicaMgr: ReplicaManager,//副本管理器。该线程类通过副本管理器来获取分区对象、副本对象以及它们下面的日志对象。metrics: Metrics,time: Time,quota: ReplicaQuota,//用做限流。作用是控制 Follower 副本拉取速度leaderEndpointBlockingSend: Option[BlockingSend] = None//用于实现同步发送请求的类。// 所谓的同步发送,是指该线程使用它给指定 Broker 发送请求,然后线程处于阻塞状态,直到接收到 Broker 返回的 Response。)extends AbstractFetcherThread(name = name,clientId = name,sourceBroker = sourceBroker,failedPartitions,fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,isInterruptible = false) {//Follower副本所在Broker的Idprivate val replicaId = brokerConfig.brokerId//用于执行请求发送的类private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,s"broker-$replicaId-fetcher-$fetcherId", logContext))//Follower发送的FETCH请求被处理返回前的最长等待时间,由参数:replica.fetch.wait.max.ms 配置,默认 500 毫秒private val maxWait = brokerConfig.replicaFetchWaitMaxMs//每个FETCH Response返回前必须要累积的最少字节数,由参数:replica.fetch.min.bytes 配置,默认 1 字节private val minBytes = brokerConfig.replicaFetchMinBytes//每个合法FETCH Response的最大字节数,由参数:replica.fetch.response.max.bytes 配置,默认 10 Mprivate val maxBytes = brokerConfig.replicaFetchResponseMaxBytes//单个分区能够获取到的最大字节数,由参数:replica.fetch.max.bytes 配置,默认 1 Mprivate val fetchSize = brokerConfig.replicaFetchMaxBytes...}
buildFetch() 方法:为指定分区集合构建对应的FetchRequest.Builder 对象,而该对象是构建 FetchRequest 的核心组件。
这个方法中有一个重要的操作:
封装拉取请求时,携带了Follower副本的 LogStartOffset 和 LEO 值(对应同步数据的第三步)
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {//定义一个保存出错分区的集合val partitionsWithError = mutable.Set[TopicPartition]()val builder = fetchSessionHandler.newBuilder()// 遍历每个分区,将处于可获取状态的分区添加到builder后续统一处理// 对于有错误的分区加入到出错分区集合partitionMap.foreach { case (topicPartition, fetchState) =>//如果分区的状态是可拉取的,且该分区未对follower限流if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {try {//获取本地Follower副本保存的分区日志的logStartOffsetval logStartOffset = replicaMgr.localReplicaOrException(topicPartition).logStartOffset/**将分区和对应的PartitionData添加到builder,注意这里的PartitionData对应的是拉取请求FetchRequest,里面封装了拉取请求的元数据信息,如:* fetchOffset:拉取消息的起始偏移量,也就是Follower副本的LEO* currentLeaderEpoch:Follower副本保存的leader epoch值*/builder.add(topicPartition, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))} catch {case _: KafkaStorageException =>//如果有异常,将该分区添加到出错分区的集合partitionsWithError += topicPartition}}}val fetchData = builder.build()val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {None} else {//构造FETCH请求的Builder对象val requestBuilder = FetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend).setMaxBytes(maxBytes).toForget(fetchData.toForget).metadata(fetchData.metadata)Some(requestBuilder)}//构建返回结果,返回Builder对象以及出错分区列表ResultWithPartitions(fetchRequestOpt, partitionsWithError)}
truncate() 方法:用于将指定分区的日志截断到指定的偏移量
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {//根据分区获取本地副本val replica = replicaMgr.localReplicaOrException(tp)val partition = replicaMgr.getPartition(tp).get//调用Partition.truecateTo方法进行日志截断// offsetTruncationState.offset:要截断到的偏移量partition.truncateTo(offsetTruncationState.offset, isFuture = false)if (offsetTruncationState.offset < replica.highWatermark.messageOffset)warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +s"${replica.highWatermark.messageOffset}")if (offsetTruncationState.truncationCompleted)replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp,offsetTruncationState.offset)}
这个方法内部依次调用了:Partition.truncateTo -> LogManager.truncateTo -> Log.truncateTo -> LogSegment.truncateTo 进行日志截断操作
processPartitionData方法:用于处理指定分区从Leader副本所在节点返回的响应,将获取的消息写入本地存储,并返回写入消息的元数据
这里有两个个重要的操作:
写入消息,更新 Follower 副本的 LEO(对应同步数据的第八步)
更新 Follower 副本本地的 HW 值(对应同步数据的第九步)
override def processPartitionData(topicPartition: TopicPartition, // 拉取数据的分区fetchOffset: Long, // 拉取的消息集合的起始位移partitionData: FetchData // 读取到的分区消息数据): Option[LogAppendInfo] = { // 返回值:写入已读取消息数据前的元数据//从副本管理器获取副本对象Replicaval replica = replicaMgr.localReplicaOrException(topicPartition)//从副本管理器获取指定主题分区对象val partition = replicaMgr.getPartition(topicPartition).get//将获取的消息封装成MemoryRecordsval records = toMemoryRecords(partitionData.records)//判断获取的消息集合是否超限maybeWarnIfOversizedRecords(records, topicPartition)//如果获取消息的起始位移值不是本地日志LEO值则视为异常情况if (fetchOffset != replica.logEndOffset)throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset))if (isTraceEnabled)trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d".format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))//TODO 写入Follower副本本地日志,更新自身的LEOval logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)if (isTraceEnabled)trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s".format(replica.logEndOffset, records.sizeInBytes, topicPartition))//根据leader返回的HW,更新Follower本地的HW:取Follower本地LEO 和 Leader HW 的较小值val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)//获取从leader返回的LogStartOffsetval leaderLogStartOffset = partitionData.logStartOffset//TODO 更新Follower副本的HW对象replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)//尝试更新Follower副本的LogStartOffsetreplica.maybeIncrementLogStartOffset(leaderLogStartOffset)if (isTraceEnabled)trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")// 副本消息拉取限流if (quota.isThrottled(topicPartition))quota.record(records.sizeInBytes)replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)//返回写入消息的元数据logAppendInfo}
override def doWork() {//尝试日志截断maybeTruncate()//尝试拉取数据maybeFetch()}
这个方法很简单,只在内部调用了两个方法:
maybeTruncate():尝试进行日志截断
private def maybeTruncate(): Unit = {// 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()// 对于有Leader Epoch值的分区,将日志截断到Leader Epoch值对应的位移值处if (partitionsWithEpochs.nonEmpty) {truncateToEpochEndOffsets(partitionsWithEpochs)}// 对于没有Leader Epoch值的分区,将日志截断到高水位值处if (partitionsWithoutEpochs.nonEmpty) {truncateToHighWatermark(partitionsWithoutEpochs)}}
这里先看对于没有Leader Epoch的分区,将日志截断到高水位处:
private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]// 遍历每个要执行截断操作的分区对象for (tp <- partitions) {// 获取分区的分区读取状态val partitionState = partitionStates.stateValue(tp)if (partitionState != null) {// 取出高水位值。val highWatermark = partitionState.fetchOffset//封装截断状态val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)info(s"Truncating partition $tp to local high watermark $highWatermark")// 执行截断到高水位值if (doTruncate(tp, truncationState))//保存分区和对应的截取状态fetchOffsets.put(tp, truncationState)}}// 更新这组分区的分区读取状态updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)}
private def maybeFetch(): Unit = {//获取分区状态集合和对应的拉取请求的集合val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {//获取要拉取消息的分区和分区对应状态的集合val fetchStates = partitionStates.partitionStateMap.asScala// TODO 第一步:为集合中的分区构造FetchRequest.builder对象,这里的返回结果有两个对象://fetchRequestOpt:要读取的分区核心信息 + FetchRequest.Builder 对象。// 而这里的核心信息,就是指要读取哪个分区,从哪个位置开始读,最多读多少字节,等等。//partitionsWithError:一组出错的分区val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)//TODO 第二步:处理出错的分区,处理方式主要是将这个分区加入到有序Map末尾,等待后续重试handlePartitionsWithErrors(partitionsWithError, "maybeFetch")// 如果当前没有可读取的分区,则等待fetchBackOffMs时间等候后续重试if (fetchRequestOpt.isEmpty) {trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)}(fetchStates, fetchRequestOpt)}//TODO 第三步:遍历FETCH请求,发送FETCH请求给Leader副本,并处理ResponsefetchRequestOpt.foreach { fetchRequest =>processFetchRequest(fetchStates, fetchRequest)}}
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates)
b:处理出错的分区。处理方式主要是将这个分区加入到有序Map末尾,等待后续重试
handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
//将给定的分区从map头部移除,然后再加到尾部,以达到轮询的目的//这里的LinkedHashMap对于插入元素是有顺序的,加入插入顺序是abcde,先读取了a,// 为了保证公平性,会将a从集合中先移除,然后放到尾部,那么下次就从b开始读public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {map.remove(topicPartition);map.put(topicPartition, state);updateSize();}
c:遍历并发送FETCH请求给Leader副本,然后处理Response
fetchRequestOpt.foreach { fetchRequest =>processFetchRequest(fetchStates, fetchRequest)}
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],fetchRequest: FetchRequest.Builder): Unit = {//定义出错分区的集合val partitionsWithError = mutable.Set[TopicPartition]()//定义接收响应数据的集合var responseData: Seq[(TopicPartition, FetchData)] = Seq.emptytry {trace(s"Sending fetch request $fetchRequest")//给Leader发送FETCH请求,获取响应数据responseData = fetchFromLeader(fetchRequest)} catch {case t: Throwable =>if (isRunning) {warn(s"Error in response for fetch request $fetchRequest", t)inLock(partitionMapLock) {partitionsWithError ++= partitionStates.partitionSet.asScalapartitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)}}}//更新请求发送速率指标fetcherStats.requestRate.mark()//如果接收到了响应if (responseData.nonEmpty) {inLock(partitionMapLock) {//遍历响应结果中的分区和分区对应的数据responseData.foreach { case (topicPartition, partitionData) =>Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>//获取分区对应的拉取状态val fetchState = fetchStates(topicPartition)// 处理Response的条件:// 1. 获取的消息集合的起始偏移量和之前已保存的下一条待写入偏移量相等// 2. 当前分区处于可获取状态if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {//获取请求中携带的Follower副本保存的 leader epoch 值val requestEpoch = if (fetchState.currentLeaderEpoch >= 0)Some(fetchState.currentLeaderEpoch)elseNonepartitionData.error match {// 如果没有错误case Errors.NONE =>try {// 交由子类完成Response的处理val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,partitionData)logAppendInfoOpt.foreach { logAppendInfo =>val validBytes = logAppendInfo.validBytesval nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffsetfetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset)if (validBytes > 0 && partitionStates.contains(topicPartition)) {val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch,state = Fetching)// 将该分区放置在有序Map读取顺序的末尾,保证公平性partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)fetcherStats.byteRate.mark(validBytes)}}} catch {case ime: CorruptRecordException =>error(s"Found invalid messages during fetch for partition $topicPartition " +s"offset ${currentFetchState.fetchOffset}", ime)partitionsWithError += topicPartitioncase e: KafkaStorageException =>error(s"Error while processing data for partition $topicPartition " +s"at offset ${currentFetchState.fetchOffset}", e)markPartitionFailed(topicPartition)case t: Throwable =>error(s"Unexpected error occurred while processing data for partition $topicPartition " +s"at offset ${currentFetchState.fetchOffset}", t)markPartitionFailed(topicPartition)}// 如果读取位移值越界,通常是因为Leader发生变更case Errors.OFFSET_OUT_OF_RANGE =>//调整越界,主要办法是做截断if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))//如果依然不能成功,将该分区添加到出错分区集合partitionsWithError += topicPartition//如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要新case Errors.UNKNOWN_LEADER_EPOCH =>debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.")// 加入到出错分区集合partitionsWithError += topicPartition// 如果Follower本地保存的Leader Epoch值比Leader所在Broker上的Epoch值要旧case Errors.FENCED_LEADER_EPOCH =>//将该分区标记为失效,从分区拉取状态集合中移除,并加入到失效分区集合if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition// 如果Leader发生变更case Errors.NOT_LEADER_FOR_PARTITION =>debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +"that the partition is being moved")// 加入到出错分区列表partitionsWithError += topicPartitioncase _ =>error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",partitionData.error.exception)// 加入到出错分区集合partitionsWithError += topicPartition}}}}}}// 处理出错分区集合,主要就是将该分区放到map数据结构的末尾if (partitionsWithError.nonEmpty) {handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")}}
内部调用了ReplicaManager.fetchMessages() 方法:
def handleFetchRequest(request: RequestChannel.Request) {...//TODO 这里是处理Follower Replica 拉取消息请求的具体方法replicaManager.fetchMessages(fetchRequest.maxWait.toLong,fetchRequest.replicaId,fetchRequest.minBytes,fetchRequest.maxBytes,versionId <= 2,interesting,replicationQuota(fetchRequest),processResponseCallback,fetchRequest.isolationLevel)...}
fetchMessages() 方法:
def fetchMessages(timeout: Long,replicaId: Int,fetchMinBytes: Int,fetchMaxBytes: Int,hardMaxBytesLimit: Boolean,fetchInfos: Seq[(TopicPartition, PartitionData)],quota: ReplicaQuota = UnboundedQuota,responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,isolationLevel: IsolationLevel) {val isFromFollower = Request.isValidBrokerId(replicaId)val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaIdval fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)FetchLogEndelse if (isolationLevel == IsolationLevel.READ_COMMITTED)FetchTxnCommittedelseFetchHighWatermark//从本地磁盘读取数据def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {val result = readFromLocalLog(replicaId = replicaId,fetchOnlyFromLeader = fetchOnlyFromLeader,fetchIsolation = fetchIsolation,fetchMaxBytes = fetchMaxBytes,hardMaxBytesLimit = hardMaxBytesLimit,readPartitionInfo = fetchInfos,quota = quota)if (isFromFollower) updateFollowerLogReadResults(replicaId, result)else result}//获取读取结果val logReadResults = readFromLog()var bytesReadable: Long = 0var errorReadingData = falseval logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]logReadResults.foreach { case (topicPartition, logReadResult) =>if (logReadResult.error != Errors.NONE)errorReadingData = truebytesReadable = bytesReadable + logReadResult.info.records.sizeInByteslogReadResultMap.put(topicPartition, logReadResult)}if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {val fetchPartitionData = logReadResults.map { case (tp, result) =>tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,result.lastStableOffset, result.info.abortedTransactions)}responseCallback(fetchPartitionData)} else {val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]fetchInfos.foreach { case (topicPartition, partitionData) =>logReadResultMap.get(topicPartition).foreach(logReadResult => {val logOffsetMetadata = logReadResult.info.fetchOffsetMetadatafetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))})}val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)}}
def readFromLocalLog(replicaId: Int,fetchOnlyFromLeader: Boolean,fetchIsolation: FetchIsolation,fetchMaxBytes: Int,hardMaxBytesLimit: Boolean,readPartitionInfo: Seq[(TopicPartition, PartitionData)],quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {//读取的起始偏移量val offset = fetchInfo.fetchOffset//读取的大小val partitionFetchSize = fetchInfo.maxBytes//follower Replica 的LogStartOffsetval followerLogStartOffset = fetchInfo.logStartOffsetbrokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)try {trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +s"remaining response limit $limitBytes" +(if (minOneMessage) s", ignoring response/partition size limits" else ""))val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)val fetchTimeMs = time.milliseconds//读取数据,获取读取结果,里面包含了读取到的消息,LEO,HW,LogStartOffset等信息val readInfo = partition.readRecords(//读取的起始偏移量fetchOffset = fetchInfo.fetchOffset,//Follower副本保存的Leader epochcurrentLeaderEpoch = fetchInfo.currentLeaderEpoch,maxBytes = adjustedMaxBytes,fetchIsolation = fetchIsolation,fetchOnlyFromLeader = fetchOnlyFromLeader,minOneMessage = minOneMessage)//获取读到的数据val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {//如果分区被限流了,那么返回一个空集合FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {//如果返回的消息集合不完整,也返回一个空集合FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)} else {//正常返回readInfo.fetchedData}//根据获取到的数据封装返回结果LogReadResult(info = fetchDataInfo,highWatermark = readInfo.highWatermark,//Leader的HWleaderLogStartOffset = readInfo.logStartOffset,//Leader的LogStartOffsetleaderLogEndOffset = readInfo.logEndOffset,//Leader的LEOfollowerLogStartOffset = followerLogStartOffset,//Follower的LogStartOffsetfetchTimeMs = fetchTimeMs,readSize = adjustedMaxBytes,lastStableOffset = Some(readInfo.lastStableOffset),exception = None//异常信息)} catch {case e@ (_: UnknownTopicOrPartitionException |_: NotLeaderForPartitionException |_: UnknownLeaderEpochException |_: FencedLeaderEpochException |_: ReplicaNotAvailableException |_: KafkaStorageException |_: OffsetOutOfRangeException) =>LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),highWatermark = -1L,leaderLogStartOffset = -1L,leaderLogEndOffset = -1L,followerLogStartOffset = -1L,fetchTimeMs = -1L,readSize = 0,lastStableOffset = None,exception = Some(e))case e: Throwable =>brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()val fetchSource = Request.describeReplicaId(replicaId)error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +s"on partition $tp: $fetchInfo", e)LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),highWatermark = -1L,leaderLogStartOffset = -1L,leaderLogEndOffset = -1L,followerLogStartOffset = -1L,fetchTimeMs = -1L,readSize = 0,lastStableOffset = None,exception = Some(e))}}//读取的最大字节var limitBytes = fetchMaxBytes//封装结果对象val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]//是否至少返回一条消息var minOneMessage = !hardMaxBytesLimit//遍历分区进行读取readPartitionInfo.foreach { case (tp, fetchInfo) =>//获取读取的结果val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)//获取每个分区读取的字节数val recordBatchSize = readResult.info.records.sizeInBytesif (recordBatchSize > 0)minOneMessage = false//更新还可以读取的字节数limitBytes = math.max(0, limitBytes - recordBatchSize)//将分区的读取结果保存到结果集合中result += (tp -> readResult)}//返回结果集result}
//获取Leader Replica的高水位val initialHighWatermark = localReplica.highWatermark.messageOffset
private def updateFollowerLogReadResults(replicaId: Int,readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {debug(s"Recording follower broker $replicaId log end offsets: $readResults")readResults.map { case (topicPartition, readResult) =>var updatedReadResult = readResultnonOfflinePartition(topicPartition) match {//如果找到了对应的分区case Some(partition) =>//根据副本id获取Partition对象中保存的副本对象Partition.allReplicasMap结构中保存了当前分区的所有副本对象。其中,key是brokerid,value是对应的Replica对象partition.getReplica(replicaId) match {//如果获取到了Replica对象case Some(replica) =>//TODO 更新leader保存的各个follower副本的LEOpartition.updateReplicaLogReadResult(replica, readResult)case None =>warn(s"Leader $localBrokerId failed to record follower $replicaId's position " +s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " +s"for partition $topicPartition. Empty records will be returned for this partition.")updatedReadResult = readResult.withEmptyFetchInfo}/如果对应的分区没有被创建case None =>warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.")}topicPartition -> updatedReadResult}}
Partition.updateReplicaLogReadResult() 方法:
def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = {val replicaId = replica.brokerIdval oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L//TODO 最终更新Leader副本保存的Follower副本的LEO的值replica.updateLogReadResult(logReadResult)val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1Lval leaderLWIncremented = newLeaderLW > oldLeaderLW//TODO 尝试更新ISR列表,在这个方法中会更新Leader副本对象的HW对象和分区对应的Log对象的HW值val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)val result = leaderLWIncremented || leaderHWIncrementedif (result)tryCompleteDelayedRequests()debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.")result}
def updateLogReadResult(logReadResult: LogReadResult) {if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)//更新Follower副本的日志起始偏移量,即 _logStartOffset 变量logStartOffset = logReadResult.followerLogStartOffset//更新Follower副本的LEO元数据对象,即 _logEndOffsetMetadata 变量logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata//最后一次拉取时Leader副本的LEOlastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffsetlastFetchTimeMs = logReadResult.fetchTimeMs}
def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {inWriteLock(leaderIsrUpdateLock) {// 检查给定的副本对象是否需要添加到ISR列表leaderReplicaIfLocal match {case Some(leaderReplica) =>//获取给定节点的Replica对象val replica = getReplica(replicaId).get//获取leader副本的HW值val leaderHW = leaderReplica.highWatermark//获取Follower副本的LEOval fetchOffset = logReadResult.info.fetchOffsetMetadata.messageOffset//判断是否需要更新ISR列表的条件://1.该节点不在ISR列表,且replica.logEndOffsetMetadata.offsetDiff(leaderHW)//2.给定Follower副本的LEO大于等于leader副本的HW//3.给定的Follower副本属于该分区//4.leader epoch对应的起始偏移量存在且小于Follower副本的LEO//满足这4个条件说明这个Follower副本已经和leader副本保持同步了,把这个Follower副本加入到ISR列表中if (!inSyncReplicas.contains(replica) &&assignedReplicas.map(_.brokerId).contains(replicaId) &&replica.logEndOffsetMetadata.offsetDiff(leaderHW) >= 0 &&leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) {//将该副本加入集合val newInSyncReplicas = inSyncReplicas + replicainfo(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")// update ISR in ZK and cache//更新ISR列表updateIsr(newInSyncReplicas)replicaManager.isrExpandRate.mark()}// check if the HW of the partition can now be incremented// since the replica may already be in the ISR and its LEO has just incremented//TODO 尝试更新leader副本的HW对象及分区对应的Log对象的HW值maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)case None => false // nothing to do if no longer leader}}}
maybeIncrementLeaderHW() 方法:尝试更新 leader 副本的 HW 对象及分区对应的Log 对象的 HW 值(对应同步数据的第六步)
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {val allLogEndOffsets = assignedReplicas.filter { replica =>curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica)}.map(_.logEndOffsetMetadata)//取ISR列表中副本的最小的LEO作为新的HWval newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)//获取旧的HWval oldHighWatermark = leaderReplica.highWatermark//如果新的HW值大于旧的HW值,就更新if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {//更新Replica的hightWatermark对象,以及对应Log对象的高水位值leaderReplica.highWatermark = newHighWatermarkdebug(s"High watermark updated to $newHighWatermark")true} else {def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffsetMetadata}"debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")false}}
def onHighWatermarkIncremented(highWatermark: Long): Unit = {lock synchronized {//更新高水位值replicaHighWatermark = Some(highWatermark)producerStateManager.onHighWatermarkUpdated(highWatermark)updateFirstUnstableOffset()}}
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {//如果是本地副本if (isLocal) {if (newHighWatermark.messageOffset < 0)throw new IllegalArgumentException("High watermark offset should be non-negative")//高水位的元数据对象highWatermarkMetadata = newHighWatermark//更新Log对象保存的高水位值log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")} else {throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")}}
//更新Replica的hightWatermark对象,以及对应Log对象的高水位值leaderReplica.highWatermark = newHighWatermark
def fetchMessages(){...logReadResults.foreach { case (topicPartition, logReadResult) =>//如果读取发生错误if (logReadResult.error != Errors.NONE)errorReadingData = truebytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes//将读取结果放入集合logReadResultMap.put(topicPartition, logReadResult)}...}
上面所说的合适的时机,分为 立即返回 和 延时返回,当满足下面四个条件之一时,便立即返回,否则会进行延时处理:
拉取等待的时间到了
拉取请求中没有拉取分区的信息
已经拉取到了足够多的数据
拉取过程中发生错误
Leader副本写入数据,Follower副本进行同步的过程分为9个步骤:
leader 副本将数据写入本地磁盘 leader 副本更新 LEO follower 副本发送同步数据请求,携带自身的 LEO leader 副本更新本地保存的其它副本的 LEO leader 副本尝试更新 ISR 列表 leader 副本更新 HW leader 副本给 follower 副本返回数据,携带 leader 副本的 HW 值 follower 副本接收响应并写入数据,更新自身 LEO follower 副本更新本地的 HW 值
关于 HW 和 LEO 的保存:
对于HW,Leader 副本和 Follower 副本只保存自身的
对于LEO,Follower 副本只保存自身的,但是 Leader 副本除了保存自身的外,还会保存所有 Follower 副本的 LEO 值
无论是Leader副本所在节点,还是Follower副本所在节点,分区对应的Partition 对象都会保存所有的副本对象,但是只有本地副本对象有对应的日志文件




