暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

图解Kafka:架构设计、消息可靠、数据持久、高性能背后的底层原理

大数据技能圈 2024-10-07
232

导读

在开发需要高吞吐量和高可靠性的消息系统时,Apache Kafka 被广泛认为是程序员的理想选择。本文将深入探讨 Kafka 的内部工作原理,不仅涵盖其整体架构,还包括消息传递过程中的具体细节,解析 Kafka 是如何通过其精妙设计的系统组件与策略来实现高效的消息异步处理及流量控制的。


我们将一同研究 Kafka 的确认(ack)机制、数据持久性方法,以及那些对提高系统性能至关重要的设计特点,比如批量发送、消息压缩、PageCache 利用和零复制技术。此外,本文还将讨论负载均衡策略与集群管理实践,为读者提供一个全方位的视角,帮助理解 Kafka 在应对大规模分布式系统中对消息队列提出的高标准要求时的表现。

目录

1 引言

2 Kafka 概述

3 Kafka 高可靠性探究

4 Kafka 高性能探究

5 其他知识探究

01


引言


在深入探讨 Kafka 的核心知识点之前,让我们先思考一下,哪些场景促使我们选择使用 Kafka?提到这一点,大多数人都会想到异步解耦和流量平滑(削峰填谷)等概念,这些确实是 Kafka 应用的主要场景。

  • 异步解耦:将传统的同步调用转变为异步消息通知,从而实现生产者与消费者之间的解耦。以商品交易为例,当一个订单创建完成后,通常需要触发多种后续操作,如统计数据更新、短信通知、邮件发送等。如果这些操作均采用同步方式执行,将极大影响系统的响应速度和整体性能。为此,可以通过引入消息中间件,将订单创建与其他后续操作解耦,确保主流程不受影响。

  • 流量平滑(削峰填谷):借助 broker 对上游生产者产生的瞬时高峰流量进行缓冲,使消费者能够以更加平滑的速度处理信息。考虑一种情况,如秒杀活动期间,上游系统可能会产生大量下单请求,而下游系统负责执行具体的秒杀业务逻辑,包括库存检查、库存锁定、账户余额验证和订单生成等,这些操作复杂且并发处理能力有限。若没有适当的流量控制措施,上游的高强度请求很可能导致下游系统过载,进而引发整个服务链路的故障。在这种情况下,利用消息队列作为中介,可以有效缓解流量压力,通过平滑处理高峰流量,避免下游系统因瞬间负荷过大而崩溃。


综上所述,在诸如交易和支付等需要高性能和高可靠性的场景中,异步解耦和流量平滑是解决关键问题的有效手段。接下来,我们将探讨 Kafka 是否具备满足这些严格需求的能力,以及它是如何做到这一点的。

02


Kafka概述


在探究 Kafka 的高性能、高可靠性之前,我们从宏观上来看下 Kafka 的系统架构。


在探究 Kafka 的高性能、高可靠性之前,我们从宏观上来看下 Kafka 的系统架构。

如图所示,Kafka 架构主要由 Producer(生产者)、Broker(服务实例)、Consumer(消费者)和用于集群管理的 ZooKeeper 组成,它们各自的功能描述如下:

  • Producer:生产者负责创建消息,并依据一定的路由策略将消息发送至相应的 Broker。

  • Broker:作为服务实例,Broker 承担着消息的持久化存储和转发任务。

  • Consumer:消费者从 Broker 中拉取消息(采用 Pull 方式)并进行消费。通常,多个消费者会被组织成一个消费者组,同一组内的消费者会竞争消费来自同一个主题的消息,确保每条消息仅由该组内的一个消费者处理。

  • ZooKeeper:负责管理 Kafka 集群的元数据,包括 Broker 和 Consumer 的状态信息等。


在 Kafka 的消息流转过程中,有几个核心概念至关重要,分别是:主题(Topic)、分区(Partition)、分段(Segment) 和 位移(Offset)。

  • Topic:消息的主题,Kafka 使用 Topic 对消息进行分类,发送和接收消息时需指定相应的 Topic。

  • Partition:为了提高系统的吞吐量,一个 Topic 可以拥有多个 Partition。这些 Partition 分布于不同的 Broker 上,用于存储 Topic 的消息,这不仅使得 Kafka 能够跨多台机器处理和存储消息,也提供了并行处理消息和水平扩展的能力。为了增强系统的可靠性,每个 Partition 通常会有主副本和多个副副本,并且这些副本分散在不同的 Broker 上,以此达到容灾的效果。

  • Segment:从宏观角度看,一个 Partition 相当于一个日志(Log)。随着生产者不断向日志末尾添加新消息,为了避免单个日志文件过于庞大而导致查询效率下降,Kafka 实现了分段和索引机制,将每个 Partition 划分为多个 Segment。每个 Segment 包括一个 .log 日志文件、两个索引文件(.index 和 .timeindex)及其他可能存在的文件。每个 Segment 的命名基于该段内最小的 Offset 值。当需要查找特定 Offset 的消息时,可以通过二分查找快速定位到相应的 Segment。

  • Offset:表示消息在日志中的位置。每当一条消息被追加到 Partition 的日志文件中时,都会被赋予一个唯一的 Offset。Offset 是一个单调递增且不变的值,它确保了消息在 Partition 内的顺序性。需要注意的是,Offset 的作用范围限定于单个 Partition 内,因此 Kafka 保证的是 Partition 内的顺序而非整个 Topic 的全局顺序。


在对 Kafka 的基础架构及其关键概念有了初步了解之后,接下来我们将深入探讨 Kafka 如何实现高可靠性和高性能。

03


Kafka高可用探究


在探究 Kafka 的高性能、高可靠性之前,我们从宏观上来看下 Kafka 的系统架构。

Kafka 高可靠性的核心在于确保消息在传递过程中不会丢失,这涉及到以下核心环节:

  • 消息要从生产者可靠地发送至 Broker,需注意网络以及本地可能出现丢数据的情况;
  • 发送到 Broker 的消息要可靠持久化,这里要考虑 PageCache 缓存落盘问题、单点崩溃问题以及主从同步跨网络的情况;
  • 消费者需从 Broker 消费到消息,并且最好只消费一次,此过程涉及跨网络消息传输。

3.1 消息从生产者可靠地发送至 Broker
要确保消息从生产者可靠地发送至 Broker,需做到两点:
  • Producer 发送消息后,能够收到来自 Broker 的消息保存成功的 ack;

  • Producer 发送消息后,能够捕获超时、失败 ack 等异常 ack 并进行处理。


3.1.1 ack 策略
针对问题 1,Kafka 为我们提供了三种 ack 策略。其中,
  • Request.required.acks = 0 表示请求发送即认为成功,不关心有没有写成功,常用于日志进行分析场景;
  • Request.required.acks = 1 是指当 leader partition 写入成功以后,才算写入成功,但有丢数据的可能;
  • Request.required.acks = -1 则是 ISR 列表里面的所有副本都写完以后,这条消息才算写入成功,属于强可靠性保证。

为了实现强可靠的 kafka 系统,我们需要将 Request.required.acks 设置为 -1,同时还需设置集群中处于正常同步状态的副本 follower 数量 min.insync.replicas>2。另外,设置 unclean.leader.election.enable=false,使得集群中只有 ISR 的 follower 才可变成新的 leader,避免特殊情况下消息截断的出现。

3.1.2 消息发送策略

针对如何确保生产者在发送消息后能够捕获并处理超时或失败的 ack,Kafka 提供了两种消息发送方式:同步(sync)发送和异步(async)发送。在 trpc-go 框架中,kafka 插件的相关参数配置如下:

trpc-kafka 底层基于开源的 sarama 实现。无论采用同步发送还是异步发送,消息发送过程中都会涉及两个协程:一个是负责消息发送的主协程,另一个是负责消息分发的 dispatcher 协程。


异步发送

对于异步发送(当 ack != 0
时,ack = 0
的情况不关心写入 Kafka 的结果,将在后文详细讲解),其流程大致如下:

  1. 主协程调用异步发送:在主协程中调用异步发送 Kafka 消息时,实际上是将消息体放入一个输入通道(input channel)。如果消息成功进入通道,该函数会立即返回,不会产生任何阻塞。如果消息未能进入通道,则会返回错误信息。因此,调用异步写入时返回的错误信息仅反映消息是否成功进入输入通道,而无法得知消息是否最终成功发送到 Kafka 的 Broker。

  2. dispatcher 协程处理消息:消息进入输入通道后,另一个名为 dispatcher 的协程会负责遍历输入通道,并将消息发送到特定 Broker 上的主 Partition 上。

  3. 异步监听发送结果:发送结果通过一个异步协程进行监听,该协程会循环处理错误通道(err channel)和成功通道(success channel)。如果在发送过程中出现错误,会记录一条错误日志。因此,在异步写入场景中,关于消息写入 Kafka 的错误信息,我们只能通过错误日志来获取具体发生了什么错误,且不支持用户自定义函数进行兜底处理。这一限制在 trpc-go 的官方文档中也有明确说明。


同步发送

同步发送(当 ack != 0
时)是在异步发送的基础上增加了条件限制实现的。同步消息发送通过 newSyncProducerFromAsyncProducer
方法启动两个异步协程来处理消息成功与失败的“回调”,并使用 waitGroup
进行等待,从而将异步操作转变为同步操作。其流程大致如下(参见 [sarama Kafka 客户端生产者与消费者梳理]):

  1. 启动异步协程:同步发送在内部启动两个异步协程,一个用于处理消息发送成功的情况,另一个用于处理消息发送失败的情况。

  2. 使用 waitGroup
     等待
    :通过 waitGroup
     等待这两个异步协程的完成,从而将异步操作转变为同步操作。

通过上述分析可以发现,Kafka 消息发送本质上都是异步的,但同步发送通过 waitGroup
将异步操作转变为同步操作。同步发送在一定程度上确保了消息在跨网络向 Broker 传输时的可靠性。在同步发送场景中,我们可以明确感知消息是否成功发送到 Broker,如果因网络抖动、机器宕机等故障导致消息发送失败或结果不明,可以通过重试等手段确保消息至少一次(at least once)发送到 Broker。

此外,从 Kafka 0.11.0.0 版本开始,Kafka 为 Producer 提供了两种机制来实现精确一次(exactly once)消息发送:

  • 幂等性(Idempotence):确保即使消息被多次发送,也不会被多次处理。

  • 事务(Transaction):通过事务机制确保一组消息要么全部成功发送,要么全部失败,从而实现精确一次的消息发送。

这些机制进一步增强了 Kafka 在高可靠性场景中的表现。



3.1.3 小结

通过配置 ack 策略、使用同步发送以及事务消息组合能力,我们可以实现跨网络向 Broker 传输消息时的 exactly once 语义。然而,当 Producer 收到 Broker 的成功 ack 时,消息是否就一定不会丢失呢?为了回答这个问题,我们需要详细了解 Broker 在接收到消息后进行了哪些处理。


3.2 发送到 Broker 的消息可靠持久化

为了确保 Producer 收到 Broker 的成功 ack 后,消息在 Broker 环节不会丢失,我们需要重点关注以下几个方面:

  1. Broker 返回 Producer 成功 ack 时,消息是否已经落盘

  2. Broker 宕机是否会导致数据丢失,容灾机制是什么

  3. Replica 副本机制带来的多副本间数据同步一致性问题如何解决

3.2.1 Broker 异步刷盘机制

为了获得更高的吞吐量,Kafka 在 Broker 接收到消息后,只是将数据写入 PageCache,然后认为消息已成功写入。PageCache 中的数据通过 Linux 的 flusher 程序进行异步刷盘,触发条件包括:

  • 主动调用 sync
     或 fsync
     函数。

  • 可用内存低于阈值。

  • 脏数据时间达到阈值。

消息处理的示意图如下:

由于消息首先写入 PageCache,如果在数据还未刷盘时 Broker 宕机,那么 Producer 产生的这部分数据可能会丢失。为了解决单机故障可能导致的数据丢失问题,Kafka 为分区引入了副本机制。


3.2.2 Replica 副本机制

Kafka 每个分区通常有多个副本,这些副本分布在不同的 Broker 上,保存相同的消息(可能有滞后)。副本之间采用“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本负责从 leader 拉取消息进行同步。

  • AR(Assigned Replicas):分区的所有副本。

  • ISR(In-Sync Replicas):与 leader 副本保持一定同步的副本,包括 leader 副本在内。

  • OSR(Out-of-Sync Replicas):与 leader 同步滞后过多的副本。

由此可见,AR = ISR + OSR。follower 副本是否与 leader 同步的判断标准取决于 Broker 端参数 replica.lag.time.max.ms
(默认为10秒)。follower 默认每隔500毫秒向 leader 拉取一次数据,只要一个 follower 副本落后 leader 副本的时间不连续超过10秒,Kafka 就认为该 follower 副本与 leader 是同步的。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR = ISR,OSR 集合为空。

当 leader 副本所在的 Broker 宕机时,Kafka 会借助 ZooKeeper 从 follower 副本中选举新的 leader 继续对外提供服务,实现故障的自动转移,保证服务的可用性。为了确保选举的新 leader 和旧 leader 的数据尽可能一致,当 leader 副本发生故障时,默认情况下只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有机会(可以通过设置 unclean.leader.election.enable
来改变这一行为)。

虽然多副本机制解决了单机故障问题,但也带来了多副本间数据同步一致性的问题。Kafka 通过以下几种措施解决了这些问题:

  1. 高水位更新机制

  2. 副本同步机制

  3. Leader Epoch


下面我们依次详细介绍这些措施。


HW 和 LEO

首先,我们来看两个与 Kafka 日志相关的重要概念:HW(High Watermark,高水位)和 LEO(Log End Offset,日志末端偏移量)。

  • HW(High Watermark,高水位):表示已经提交(commit)的最大日志偏移量。在 Kafka 中,某条日志“已提交”意味着 ISR 中所有节点都包含了此条日志,消费者只能消费 HW 之前的数据。

  • LEO(Log End Offset,日志末端偏移量):表示当前日志文件中下一条待写入消息的偏移量。

如下图所示,假设一个日志文件中有8条消息,0至5之间的消息为已提交消息,5至7的消息为未提交消息。日志文件的 HW 为5,表示消费者只能拉取到5之前的消息,而 offset 为5的消息对消费者而言是不可见的。日志文件的 LEO 为8,下一条消息将在此处写入。

注意:所有副本都有对应的 HW 和 LEO,但 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特点:

  • Leader HWmin(所有副本 LEO)
    。Leader 副本不仅要保存自己的 HW 和 LEO,还要保存 follower 副本的 HW 和 LEO。

  • Follower HWmin(follower 自身 LEO,leader HW)
    。Follower 副本只需保存自己的 HW 和 LEO。

为了方便描述,下面将 Leader HW 简记为 HW_L
,Follower HW 简记为 HW_F
,Leader LEO 简记为 LEO_L
,Follower LEO 简记为 LEO_F
。下面我们演示一次完整的 HW LEO 更新流程:

初始状态

  • HW_L = 0

  • LEO_L = 0

  • HW_F = 0

  • LEO_F = 0

Follower 第一次 fetch

  1. Leader 收到 Producer 发来的消息并完成存储,更新 LEO_L = 1

  2. Follower 从 Leader fetch 数据,Leader 收到请求,记录 LEO_F = 0
    ,并尝试更新 HW_L = min(全部副本 LEO) = 0

  3. Leader 返回 HW_L = 0
     和 LEO_L = 1
     给 Follower
    ,Follower 存储消息并更新 LEO_F = 1
    HW = min(LEO_F, HW_L) = 0

Follower 第二次 fetch

  1. Follower 再次从 Leader fetch 数据,Leader 收到请求,记录 LEO_F = 1
    ,并尝试更新 HW_L = min(全部副本 LEO) = 1

  2. Leader 返回 HW_L = 1
     和 LEO_L = 1
     给 Follower
    ,Follower 更新自己的 HW = min(LEO_F, HW_L) = 1

注意事项

在上述更新流程中,Follower 和 Leader 的 HW 更新存在时间差。如果 Leader 节点在此期间发生故障,Follower 的 HW 和 Leader 的 HW 可能会处于不一致状态。如果 Follower 被选为新的 Leader 并且以自己的 HW 为准对外提供服务,则可能带来数据丢失或数据错乱问题。


KIP-101 问题:数据丢失 & 数据错乱

数据丢失

  1. 第1步:副本 B 作为 leader 收到 producer 的 m2 消息并写入本地文件,等待副本 A 拉取。副本 A 发起消息拉取请求,请求中携带自己的最新日志偏移量(LEO=1)。B 收到请求后更新自己的 HW 为1,并将 HW=1 的信息以及消息 m2 返回给 A。A 收到拉取结果后更新本地的 HW 为1,并将 m2 写入本地文件。随后,A 发起新一轮拉取请求(LEO=2),B 收到 A 的拉取请求后更新自己的 HW 为2,由于没有新数据,只将 HW=2 的信息返回给 A,并回复 producer 写入成功。此时的状态如图中第一步所示。

  2. 第2步:正常情况下,A 会收到 B 的回复,得知当前的 HW 为2,然后更新自身的 HW 为2。但在第2步中,A 重启了,没有来得及收到 B 的回复,此时 B 仍然是 leader。A 重启后会以 HW 为标准截断自己的日志,因为 A 作为 follower 不知道多出的日志是否已被提交,为了防止数据不一致,A 会截断多余的数据并尝试从 leader 那里重新同步。

  3. 第3步:B 崩溃了,由于 min.isr
    设置为1,Zookeeper 会从 ISR 中选择一个新的 leader,即 A。然而,A 的数据不完整,导致数据丢失。问题在于 A 重启后以 HW 为标准截断了多余的日志。如果不截断会怎样?不行,因为这些日志可能没有被提交(即没有被 ISR 中的所有节点写入),如果保留会导致日志错乱。

总结

数据丢失的根本原因在于 A 重启后以 HW 为标准截断了多余的日志。如果不截断这些日志,可能会导致日志错乱,因为这些日志可能没有被提交。因此,Kafka 设计了这种截断机制来确保数据的一致性。


数据错乱

在分析日志错乱的问题之前,我们需要了解 Kafka 的副本可靠性保证有一个前提:在 ISR 中至少有一个节点。如果所有节点都宕机,Kafka 不保证数据的可靠性,这种情况下会出现数据丢失,数据丢失是可以接受的。然而,我们在这里分析的问题比数据丢失更加严重,会引发日志错乱甚至导致整个系统异常,这是不可接受的。

数据错乱的具体场景

  1. 第1步:A 和 B 均为 ISR 中的节点。副本 A 作为 leader,收到 producer 的消息 m2 请求后写入 PageCache,并在某个时刻刷新到本地磁盘。副本 B 拉取到 m2 后写入 PageCache(尚未刷盘),然后再次从 A 中拉取新消息并告知 A 自己的 LEO=2。A 收到后更新自己的 HW 为1,并回复 producer 写入成功。此时 A 和 B 同时宕机,B 的 m2 由于尚未刷盘,所以 m2 消息丢失。此时的状态如图中第一步所示。

  2. 第2步:由于 A 和 B 均宕机,而 min.isr=1
    并且 unclean.leader.election.enable=true
    (关闭 unclean 选择策略),Kafka 会等待第一个 ISR 中的节点恢复并选为 leader。不幸的是,B 被选为 leader,并且收到了 producer 发来的新消息 m3。注意,这里丢失 m2 消息是可接受的,因为所有节点都宕机了。

  3. 第3步:A 恢复重启后发现自己是 follower,且 HW 为2,没有多余的数据需要截断,所以开始和 B 进行新一轮的同步。但此时 A 和 B 均没有意识到 offset 为1的消息不一致了。

问题分析

问题在于日志的写入是异步的。Kafka 的副本策略设计中,消息的持久化是异步的,这会导致在某些情况下被选出的 leader 不一定包含所有数据,从而引发日志错乱的问题。

Leader Epoch 解决方案

为了解决上述缺陷,Kafka 引入了 Leader Epoch 的概念。Leader Epoch 类似于 Raft 中的任期号,每次重新选择 leader 时,用一个严格单调递增的 ID 来标记,这样所有 follower 都能意识到 leader 的变化。follower 不再以 HW 为准,每次崩溃重启后都需要去 leader 那边确认当前 leader 的日志是从哪个 offset 开始的。下面看下 Leader Epoch 是如何解决上述两个问题的。


数据丢失解决

关键在于副本 A 重启后作为 follower 时,不是立即以 HW 为准截断自己的日志,而是先发起 LeaderEpochRequest
询问副本 B 第0代的最新偏移量。副本 B 会返回自己的 LEO 为2,这时副本 A 就知道消息 m2 不能被截断,因此 m2 得到了保留。当 A 被选为 leader 时,就保留了所有已提交的日志,从而解决了日志丢失的问题。如果在发起 LeaderEpochRequest
时 A 又宕机了怎么办?这种情况下,不会出现日志丢失,因为副本 A 被选为 leader 后不会截断自己的日志,日志截断只会发生在 follower 身上。

数据错乱解决

关键仍然在第3步,当副本 A 重启作为 follower 时,第一步仍然是发起 LeaderEpochRequest
询问 leader 当前第0代的最新偏移量。由于副本 B 已经经过换代,会返回给 A 第1代的起始偏移量(即1)。A 发现冲突后会截断自己偏移量为1的日志,并重新开始与 leader 同步。这样,副本 A 和副本 B 的日志达到了一致,解决了日志错乱的问题。

3.2.3 小结

Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功。然而,通过副本机制并结合 ACK 策略,可以大大降低单机宕机带来的数据丢失风险。通过 HW、副本同步机制和 Leader Epoch 等多种措施,Kafka 解决了多副本间的数据同步一致性问题,最终实现了 Broker 数据的可靠持久化。


3.3 消费者从 Broker 消费到消息且最好只消费一次

在消费者从 Broker 消费消息的过程中,消费者需要向 Kafka 汇报自己的位移数据(offset)。只有当消费者向 Kafka 汇报了消息的位移,该消息才会被 Broker 认为已经被消费。因此,消费者端消息的可靠性主要取决于 offset 的提交方式。Kafka 消费端提供了两种消息提交方式:

由于存在重复消费的可能性,正常情况下我们通常需要在应用层面实现幂等控制,以确保消息即使被重复消费也不会产生不良影响。

04


Kafka高性能探究


Kafka 高性能的核心在于保障系统低延迟、高吞吐地处理消息。为此,Kafka 采用了许多精妙的设计,包括异步发送、批量发送、压缩技术、PageCache 机制与顺序追加落盘、零拷贝、稀疏索引、Broker 与数据分区、多 Reactor 多线程网络模型。

4.1 异步发送

如上文所述,Kafka 提供了异步和同步两种消息发送方式。在异步发送中,整个流程都是异步的。调用异步发送方法后,消息会被写入一个 channel,然后立即返回成功。Dispatcher 协程会从 channel 轮询消息,将其发送到 Broker,同时会有另一个异步协程负责处理 Broker 返回的结果。同步发送本质上也是异步的,但在处理结果时,同步发送通过 waitGroup
将异步操作转换为同步。使用异步发送可以最大化提高消息发送的吞吐能力。

4.2 批量发送

Kafka 支持批量发送消息,将多个消息打包成一个批次进行发送,从而减少网络传输的开销,提高网络传输的效率和吞吐量。Kafka 的批量发送消息是通过以下两个参数来控制的:

  • batch.size:控制批量发送消息的大小,默认值为16KB。适当增加 batch.size
     参数值可以提升吞吐量,但需要注意,如果批量发送的大小设置得过大,可能会导致消息发送的延迟增加,因此需要根据实际情况进行调整。

  • linger.ms:控制消息在批量发送前的等待时间,默认值为0。当 linger.ms
     大于0时,如果有消息发送,Kafka 会等待指定的时间,如果等待时间到达或者批量大小达到 batch.size
    ,就会将消息打包成一个批次进行发送。适当增加 linger.ms
     参数值可以提升吞吐量,例如设置为10~100毫秒。

在 Kafka 的生产者客户端中,当发送消息时,如果启用了批量发送,Kafka 会将消息缓存到缓冲区中。当缓冲区中的消息大小达到 batch.size
或者等待时间到达 linger.ms
时,Kafka 会将缓冲区中的消息打包成一个批次进行发送。如果在等待时间内没有达到 batch.size
,Kafka 也会将缓冲区中的消息发送出去,从而避免消息积压。

4.3 压缩技术

Kafka 支持压缩技术,通过将消息进行压缩后再进行传输,从而减少网络传输的开销。压缩和解压缩的过程会消耗一定的 CPU 资源,因此需要根据实际情况进行调整。Kafka 支持多种压缩算法,通过配置参数 compression.type
(默认值为 none
,表示不进行压缩)控制。在 Kafka 2.1.0 版本之前,仅支持 GZIP、Snappy 和 LZ4,2.1.0 版本后还支持 Zstandard 算法(Facebook 开源,能够提供超高压缩比)。这些压缩算法的性能对比(两指标都是越高越好)如下:

  • 吞吐量:LZ4 > Snappy > zstd 和 GZIP

  • 压缩比:zstd > LZ4 > GZIP > Snappy

在 Kafka 的生产者客户端中,当发送消息时,如果启用了压缩技术,Kafka 会将消息进行压缩后再进行传输。在消费者客户端中,如果消息进行了压缩,Kafka 会在消费消息时将其解压缩。需要注意的是,如果 Broker 设置了与生产者不同的压缩算法,接收消息后会解压后重新压缩保存。此外,如果存在消息版本兼容问题,也会触发解压后再压缩。


4.4 PageCache 机制 & 顺序追加落盘

为了提升系统吞吐和降低时延,Kafka 在 Broker 接收到消息后,只是将数据写入 PageCache 后便认为消息已写入成功。PageCache 中的数据通过 Linux 的 flusher 程序进行异步刷盘,避免了同步刷盘带来的巨大系统开销,将数据顺序追加写入磁盘日志文件中。由于 PageCache 在内存中进行缓存,读写速度非常快,可以大大提高读写效率。顺序追加写充分利用了顺序 I/O 写操作的优势,避免了缓慢的随机 I/O 操作,从而有效提升 Kafka 的吞吐量。

如下图所示,消息被顺序追加到每个分区日志文件的尾部。

4.5 零拷贝

Kafka 中存在大量的网络数据持久化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程,这一过程的性能直接影响 Kafka 的整体吞吐量。传统的 IO 操作存在多次数据拷贝和上下文切换,性能较低。Kafka 利用零拷贝技术提升上述过程的性能,其中网络数据持久化到磁盘主要使用 mmap 技术,网络数据传输环节主要使用 sendfile 技术。

4.5.1 索引加速之 mmap

在传统模式下,数据从网络传输到文件需要 4 次数据拷贝、4 次上下文切换和两次系统调用。如下图所示:

为了减少上下文切换和数据拷贝带来的性能开销,Kafka 使用 mmap 来处理其索引文件。Kafka 中的索引文件用于在提取日志文件中的消息时进行高效查找。这些索引文件被维护为内存映射文件,这允许 Kafka 快速访问和搜索内存中的索引,从而加速在日志文件中定位消息的过程。mmap 将内核中读缓冲区(read buffer)的地址与用户空间的缓冲区(user buffer)进行映射,实现内核缓冲区与应用程序内存的共享,省去了将数据从内核读缓冲区拷贝到用户缓冲区的过程。整个拷贝过程发生 4 次上下文切换、1 次 CPU 拷贝和 2 次 DMA 拷贝。

4.5.2 网络数据传输之 sendfile

传统方式实现时,先读取磁盘、再用 socket 发送,实际也经过四次拷贝。如下图所示:

为了减少上下文切换和数据拷贝带来的性能开销,Kafka 在 Consumer 从 Broker 读数据过程中使用了 sendfile 技术。具体实现方案是通过 NIO 的 transferTo/transferFrom
调用操作系统的 sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝,如下:

4.6 稀疏索引

为了方便对日志进行检索和过期清理,Kafka 日志文件除了用于存储日志的 .log
文件外,还有两个索引文件:位移索引文件 .index
和时间戳索引文件 .timeindex
。这三个文件的名字完全相同,如下:

Kafka 的索引文件是按照稀疏索引的思想设计的。稀疏索引的核心是不会为每个记录都保存索引,而是写入一定数量的记录之后才会增加一个索引值,具体间隔由 log.index.interval.bytes
参数控制,默认大小为 4 KB,意味着 Kafka 至少写入 4 KB 消息数据之后,才会在索引文件中增加一个索引项。单条消息的大小会影响 Kafka 索引的插入频率,因此 log.index.interval.bytes
是 Kafka 调优的一个重要参数值。由于索引文件也是按消息的顺序性增加索引项,Kafka 可以利用二分查找算法来搜索目标索引项,将时间复杂度降至 O(log N),大大减少了查找时间。

位移索引文件 .index

位移索引文件的索引项结构如下:

  • 相对位移:保存于索引文件名字上的起始位移的差值。假设一个索引文件为 00000000000000000100.index
    ,起始位移值为 100,当存储位移为 150 的消息索引时,在索引文件中的相对位移为 150 - 100 = 50。这样做的好处是使用 4 字节保存位移即可,节省了大量的磁盘空间。

  • 文件物理位置:消息在 log 文件中保存的位置。Kafka 可根据消息位移,通过位移索引文件快速找到消息在 log 文件中的物理位置,从而快速从 log 文件中找到对应的消息。

假设 Kafka 需要找出位移为 3550 的消息,首先会使用二分查找算法找到小于 3550 的最大索引项 [3528, 2310272]
,然后根据该索引项的文件物理位置在 log 文件中从位置 2310272 开始顺序查找,直至找到位移为 3550 的消息记录。

时间戳索引文件 .timeindex

Kafka 在 0.10.0.0 以后的版本中,消息中增加了时间戳信息,为了满足用户根据时间戳查询消息记录的需求,Kafka 增加了时间戳索引文件。时间戳索引文件的索引项结构如下:

时间戳索引文件的检索与位移索引文件类似,如下快速检索消息示意图:

4.7 Broker & 数据分区

Kafka 集群包含多个 Broker。一个 topic 下通常有多个 partition,partition 分布在不同的 Broker 上,用于存储 topic 的消息。这使得 Kafka 可以在多台机器上处理和存储消息,提供了并行的消息处理能力和横向扩展能力。

4.8 多 Reactor 多线程网络模型

多 Reactor 多线程网络模型是一种高效的网络通信模型,它采用了多个 Reactor 线程和多个工作线程来处理网络请求,可以充分利用多核 CPU 的性能,提高系统的吞吐量和响应速度。Kafka 为了提升系统的吞吐,在 Broker 端处理消息时采用了该模型,示意如下:

  • SocketServer:实现 Reactor 模式,用于处理多个 Client(包括客户端和其他 Broker 节点)的并发请求,并将处理结果返回给 Client。

  • KafkaRequestHandlerPool:Reactor 模式中的 Worker 线程池,定义了多个工作线程,用于处理实际的 I/O 请求逻辑。

整个服务端处理请求的流程大致分为以下几个步骤:

  1. Acceptor 接收客户端发来的请求。

  2. 轮询分发 给 Processor 线程处理。

  3. Processor 将请求封装成 Request 对象,放到 RequestQueue 队列。

  4. KafkaRequestHandlerPool 分配工作线程,处理 RequestQueue 中的请求。

  5. KafkaRequestHandler 线程处理完请求后,将响应 Response 返回给 Processor 线程。

  6. Processor 线程将响应返回给客户端。

05


其他知识探究


5.1 负载均衡

5.1.1 生产者负载均衡

Kafka 生产端的负载均衡主要指如何将消息发送到合适的分区。Kafka 生产者在生产消息时,根据分区器将消息投递到指定的分区中,因此 Kafka 的负载均衡很大程度上依赖于分区器。Kafka 默认的分区器是 DefaultPartitioner
,其分区策略如下:

  • 如果 key 不为 null:对 Key 值进行哈希计算,从所有分区中根据 Key 的哈希值计算出一个分区号。拥有相同 Key 值的消息会被写入同一个分区,这是实现顺序消息的关键。

  • 如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息。

如果不想使用 Kafka 默认的分区器,用户可以实现 Partitioner
接口,自行实现分区方法。

5.1.2 消费者负载均衡

在 Kafka 中,每个分区(Partition)只能由一个消费者组中的一个消费者消费。当消费者组中有多个消费者时,Kafka 会自动进行负载均衡,将分区均匀地分配给每个消费者。Kafka 中的消费者负载均衡算法可以通过设置消费者组的 partition.assignment.strategy
参数来选择。目前主流的分区分配策略有以下几种:

  • Range:在保证均衡的前提下,将连续的分区分配给消费者,对应的实现是 RangeAssignor

  • Round-Robin:在保证均衡的前提下,轮询分配,对应的实现是 RoundRobinAssignor

  • StickyAssignor:从 0.11.0.0 版本引入的一种新的分区分配策略,其优势在于能够在保证分区均衡的前提下尽量保持原有的分区分配结果,从而避免许多冗余的分区分配操作,减少分区再分配的执行时间。

5.2 集群管理

Kafka 借助 ZooKeeper 进行集群管理。Kafka 中的许多信息都在 ZooKeeper 中维护,包括 broker 集群信息、consumer 集群信息、topic 相关信息、partition 信息等。Kafka 的许多功能也是基于 ZooKeeper 实现的,如 partition 选主、broker 集群管理、consumer 负载均衡等。限于篇幅,本文将不展开陈述,这里附上一张网上截图供大家参考:

文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论