一、什么是Kafka
Apache Kafka 是一款开源的消息引擎系统(也是分布式流处理平台,这部分目前没什么理解),它的主要功能是提供一套完备的消息发布与订阅解决方案。通俗点可以这么理解:系统 A 发送消息给「消息引擎系统」,系统 B 从「消息引擎系统」中读取 A 发送的消息。

二、Kafka的作用
上面的通俗理解中,为什么系统 A 不能直接发送消息给系统 B,中间还要隔一个「消息引擎系统」呢?
「消息引擎系统」的作用就是“削峰填谷”,其含义如下:
指缓冲上下游瞬时突发流量,使其更平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下游系统可能会直接被压垮导致全链路服务“雪崩”。但是,一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的“峰”填满到“谷”中,避免了流量的震荡。消息引擎系统的另一大好处在于发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。
三、Kafka的核心术语和消息架构
1. 核心术语
消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
生产者:Producer。向主题发布新消息的应用程序。消费者:Consumer。从主题订阅新消息的应用程序。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
2. 消息架构
Kafka 的三层消息架构:
第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。最后,客户端程序只能与分区的领导者副本进行交互。
副本的工作机制:
生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。
至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步,从而保证数据的持久化或消息不丢失。
【图例】

四、Kafka重要参数
1. Broker 端参数
(1)与存储信息相关的参数
Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:
a. log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。这个参数是没有默认值的,这说明它必须要指定。
b. log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。
配置思路&方法:
在线上生产环境中我们只需要设置 log.dirs,即第一个参数就好了,不要设置 log.dir。
而且更重要的是,在线上生产环境中一定要为 log.dirs 配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如:/home/kafka1,/home/kafka2,/home/kafka3 这样。
如果有条件的话我们最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
能够实现故障转移:即 Failover。Kafka Broker 使用的任何一块磁盘挂掉了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。
(2)与 ZooKeeper 相关的参数
ZooKeeper 是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些 Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。
zookeeper.connect:这也是一个 CSV 格式的参数,比如我可以指定它的值为:zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper 的默认端口。
如果多个 Kafka 集群使用同一套 ZooKeeper 集群,那么这个参数应该如下设置:
如果有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的 zookeeper.connect 参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。
(3)与 Broker 连接相关的参数
客户端程序或其他 Broker 如何与该 Broker 进行通信的设置。有以下三个参数:
a. listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
b. advertised.listeners:和 listeners 相比多了个 advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
c. host.name/port:这两个参数过期了,不用设置。
监听器参数从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如 PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是我们自己定义的协议名字,比如CONTROLLER: localhost:9092。
一旦我们自己定义了协议名称,我们必须还要指定 listener.security.protocol.map 参数告诉这个协议底层使用了哪种安全协议,比如指定 listener.security.protocol.map=CONTROLLER:PLAINTEXT 表示CONTROLLER 这个自定义协议底层使用明文不加密传输数据。
【注意】主机名这个设置中到底使用 IP 地址还是主机名?这里大牛给出统一的建议:最好全部使用主机名,即 Broker 端和 Client 端应用配置中全部填写主机名。Broker 源代码中也使用的是主机名,如果在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
(4)与 Topic 管理相关的参数
a. auto.create.topics.enable:是否允许自动创建 Topic。
b. unclean.leader.election.enable:是否允许 Unclean Leader 选举。
c. auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。
参数详解:
auto.create.topics.enable 参数建议最好设置成 false,即不允许自动创建 Topic。否则在线上环境里面可能就会有很多名字稀奇古怪的 Topic。
unclean.leader.election.enable 参数建议设置成 false,即关闭自动 Leader 选举。
每个 Broker 有多个分区,每个分区都有多个副本来保证高可用,在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本。
事实是并不是所有副本都有资格竞争 Leader,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格参与竞争。
那么问题来了,假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行 Leader 选举了?此时这个参数就派上用场了。
如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。
反之如果是 true,那么 Kafka 允许从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全。
auto.leader.rebalance.enable 参数建议设置成 false,禁止 Kafka 定期对一些 Topic 分区进行 Leader 重选举。
这个参数对生产环境影响非常大,设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。
严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若 auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。
换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此建议在生产环境中把这个参数设置成 false。
(5)与数据留存相关的参数
a. log.retention.{hours|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hours 最低。
b. log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。
c. message.max.bytes:控制 Broker 能够接收的最大消息大小。
参数详解:
log.retention.{hours|minutes|ms} 参数:虽然 ms 设置有最高的优先级,但是通常情况下我们还是设置 hours 级别的多一些,比如 log.retention.hours=168 表示默认保存 7 天的数据,自动删除 7 天前的数据。
log.retention.bytes 参数:这个参数值默认是 -1,表明想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想我们要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。
message.max.bytes 参数:这个参数默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是很常见的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。
2. Topic 级别参数
如果同时设置了 Topic 级别参数和全局 Broker 参数,到底听谁的呢?哪个说了算呢?答案就是 Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值,这就是所谓的 Topic 级别参数。
a. retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
b. retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
c. max.message.bytes:决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。
参数详解:
创建 Topic 时进行设置
设想我们部门需要将玩家数据发送到 Kafka 进行处理,需要保存最近半年的玩家数据,同时这些数据很大,通常都有几 MB,但一般不会超过 5MB。现在让我们用以下命令来创建 Topic:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
注意结尾处的 --config 设置,我们就是在 config 后面指定了想要设置的 Topic 级别参数。
修改 Topic 时设置
下面我们用自带的命令 kafka-configs 来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,命令如下:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
总结:
总体来说,我们只能使用这么两种方式来设置 Topic 级别参数。大神的建议是,我们最好始终坚持使用第二种方式来设置,并且在未来,Kafka 社区很有可能统一使用 kafka-configs 脚本来调整 Topic 级别参数。
3. JVM 参数
a. KAFKA_HEAP_OPTS:指定堆大小。
b. KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。
参数设置:
在启动 Kafka Broker 之前,先设置上这两个环境变量:
将 JVM 堆大小设置成 6GB
如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。
启用方法是指定:-XX:+UseCurrentMarkSweepGC
否则,使用吞吐量收集器
开启方法是指定:-XX:+UseParallelGC
$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true$> bin/kafka-server-start.sh config/server.properties
4. 操作系统参数
① 文件描述符限制
② 文件系统类型
③ Swappiness
④ 提交时间
参数详解:
文件描述符限制:ulimit -n。文件描述符系统资源并不像我们想象的那样昂贵,我们不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。如果设置过小,我们会经常看到“Too many open files”的错误。
文件系统类型:文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。
Swappiness:swap 的调优。很多相关文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。其实还是不要设置成 0 比较好,我们可以设置成一个较小的值。因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,我们至少能够观测到 Broker 性能开始出现急剧下降,从而给我们进一步调优和诊断问题的时间。基于这个考虑,建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
提交时间:提交时间或者说是 Flush 落盘时间。向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。
五、Kafka客户端原理
1. 生产者消息分区机制原理
为什么分区?
提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份,如下所示:

分区策略:
所谓分区策略是决定生产者将消息发送到哪个分区的算法。
轮询策略
也称 Round-robin 策略,即顺序分配。比如一个主题下有 3 个分区,那么第一条消息被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第 4 条消息时又会重新开始,即将其分配到分区 0。如下图:

轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果我们未指定partitioner.class参数,那么我们的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
随机策略
也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上。如下图:

如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
按消息键保序策略
Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等;也可以用来表征消息元数据。
一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。如下图:

实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return Math.abs(key.hashCode()) % partitions.size();
分区总结:
如果消息指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略。
2. 生产者压缩
在 Kafka 中,压缩可能放生在两个地方:生产端和 Broker 端。
消息压缩能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
生产者程序中配置 compression.type 参数即表示启用指定类型的压缩算法。
在生产者端启用压缩是很自然的想法,那为什么在 Broker 端也可能进行压缩呢?其实大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但这里的“大部分情况”也是要满足一定条件的。有两种例外情况就可能让 Broker 重新压缩消息。
【情形一】:Broker 端指定了和 Producer 端不同的压缩算法:Producer 使用 GZIP 进行压缩,Broker 这边接收的消息必须使用 Snappy 算法进行压缩
这种情况下 Broker 接收到 GZIP 压缩消息后,只能解压缩然后使用 Snappy 重新压缩一遍。Broker 端也有一个参数叫 compression.type,但是这个参数的默认值是 producer,这表示 Broker 端会“尊重”Producer 端使用的压缩算法。可一旦在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。
【情形二】Broker 端发生了消息格式转换
所谓的消息格式转换主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中很可能同时保存多种版本的消息格式。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。一般情况下这种消息格式转换对性能是有很大影响。
什么时候解压?
有压缩必有解压缩。通常来说解压缩发生在消费者程序中,也就是说 Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。
那么 Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当 Consumer 读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。用一句话总结一下压缩和解压缩:Producer 端压缩、Broker 端保持、Consumer 端解压缩。
各种压缩算法对比:
在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
在压缩比方面,zstd > LZ4 > GZIP > Snappy
3. Kafka 生产者程序概览
Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常开发一个生产者的步骤有 4 步:
第 1 步:构造生产者对象所需的参数对象。
第 2 步:利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
第 3 步:使用 KafkaProducer 的 send 方法发送消息。
第 4 步:调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
4. 何时创建 TCP 连接?
在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。
【TODO】这部分没有彻底理解,需要继续深入,通过实践打印启动日志验证理解。
5. 何时关闭 TCP 连接?
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。
这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close() 方法来关闭。
第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。
六、学习小结
本次学习对 Kafka 的原理和之前模糊不清的基础知识有了比较清楚的理解,比如一些参数的配置是怎么回事,会带来什么影响,以及未来在实践中怎么去调配这些参数,出现问题怎么去排查等会有头绪。但还有不少基础知识没有学习归纳出来,比如监控,安全,调优等,需要持续学习和补充。
这次学习的是 Kafka 的理论知识,没有结合实践操作去加深理解,虽然现在项目中用到了 Kafka,但涉及到的内容还很小,用到最多的是常用的几个 API 及相关用法。
所以,这次学习不是到此为止了,仅是个开始,接下来自己会通过看实战方面的书籍来边学习边实践,争取更加深入的掌握 Kafka 应用相关知识。




