
【作者】詹玉林 中国民生银行信息科技部开源软件支持组工程师
关联阅读:Kafka 容错及高可用原理
环境准备
准备测试所需的docker image
在测试中将使用confluent三个主要系列的最新版本进行测试,检查不同版本在相同测试场景下的异同
docker pull confluentinc/cp-zookeeper:7.0.0
docker pull confluentinc/cp-kafka:5.5.6
对应Apache Kafka® 2.5.1
docker pull confluentinc/cp-kafka:6.2.1
对应Apache Kafka® 2.8.0
docker pull confluentinc/cp-kafka:7.0.0
对应Apache Kafka® 3.0.0
准备搭建测试环境的工具
安装测试工具blockade,详情可见:https://blockade.readthedocs.io/en/latest/install.html
准备测试脚本
https://github.com/zhan-yl/ChaosTestingCode.git
测试准备
数据丢失的定义:
在生产端已经收到kafka端返回确认的情况下,生产端认为数据已经被kafka写入成功,不会再对这部分数据发起重试操作,但这时由于kafka端某种异常导致这部分已经返回过确认的数据无法找回的情况(非所有数据副本发生不可修复故障),定义为我们测试中的数据丢失
测试方法:
客户端:统计已成功发送到kafka端并获得成功确认的数据量
kafka端:每次新建单分区topic,然后通过获取kafka.tools.GetOffsetShell即可知道kafka实际存储的数据量
测试过程:在客户端写入数据的过程中模拟制造各种生产故障
比对:如果客户端的成功发送数据量大于kafka实际存储的数据量,则发生数据丢失
测试操作流程
每次测试时重新初始化整个集群,集群中通过容器建立kafka、zookeeper节点,集群创建完毕后创建kafka的单分区topic test1。在多个不同kafka版本间进行对比测试
在一个窗口持续模拟向kafka的topic写入大量数据
在另一个窗口对集群的broker节点模拟产生各种故障:节点宕机、网络缓慢、网络分区等
最终检查kafka集群中实际写入的数据与预期是否相符
测试前提:
在测试中我们没有对发送失败的数据进行重试操作,这是为了避免对测试结果统计造成影响,但是数据重试在生产操作中是必须有的环节
第一次启动环境:
blockade up
测试过程中zookeeper、kafka的数据存储均使用本地存储,保存在~/ChaosTestingCode/Kafka/cluster/volumes/
测试场景一
场景设定:
ACKS=0,即数据生产客户端并不需等待kafka端是否返回ACK即认为数据已经发送成功,期间topic的leader发生故障切换。
操作过程:
客户端模拟向kafka发送100000条数据,ack设置为0,期间杀死topic的leader强制leader发生切换
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 5da02d8887d5 UP 172.17.0.3 NORMAL
kafka2 34238c04862a UP 172.17.0.4 NORMAL
kafka3 fd202d94bd05 UP 172.17.0.5 NORMAL
zk1 61fc580b7a7e UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
当前leader为2节点,模拟节点2出现宕机
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 5da02d8887d5 UP 172.17.0.3 NORMAL
kafka2 34238c04862a DOWN UNKNOWN
kafka3 fd202d94bd05 UP 172.17.0.5 NORMAL
zk1 61fc580b7a7e UP 172.17.0.2 NORMAL
新leader切换到3节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1
检查kefka实际存储数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:99947
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 0 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%6|1638169471.598|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Disconnected (after 19087ms in state UP)
%3|1638169471.598|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Connect to ipv4#172.17.0.4:9092 failed: Connection refused (after 0ms in state CONNECT)
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
由于网络连接失败及发生leader切换,客户端发送100000条数据没有失败,kafka中实际写入99947条数据,因此导致53条数据丢失
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景二
场景设定:
ACKS=1,即数据生产客户端只需接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间topic的leader发生故障切换。
操作过程:
客户端模拟向kafka发送100000条数据,ack设置为1,期间杀死topic的leader强制leader发生切换
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 aa0f36c550e6 UP 172.17.0.3 NORMAL
kafka2 7eaea8c1d88f UP 172.17.0.4 NORMAL
kafka3 d1489c809d78 UP 172.17.0.5 NORMAL
zk1 1d533d285fec UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
当前leader为3,模拟节点3出现宕机
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 aa0f36c550e6 UP 172.17.0.3 NORMAL
kafka2 7eaea8c1d88f UP 172.17.0.4 NORMAL
kafka3 d1489c809d78 DOWN UNKNOWN
zk1 1d533d285fec UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2
检查kafka实际存储的数据量
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:99944
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%4|1638172063.335|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected (after 17386ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%6|1638172063.336|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
Success: 39944 Failed: 56
Success: 49944 Failed: 56
Success: 59944 Failed: 56
Success: 69944 Failed: 56
Success: 79944 Failed: 56
Success: 89944 Failed: 56
Success: 99944 Failed: 56
Sent: 100000
Delivered: 99944
Failed: 56
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
由于网络连接失败及发生leader切换,客户端发送100000条数据,其中56条数据发送失败,kafka中实际写入99944条数据,没有发生数据丢失
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景三
场景设定:
ACKS=all,即数据生产客户端需要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间topic的leader发生故障切换。
操作过程:
客户端模拟向kafka发送1000000条数据,以十个并发进程同时进行,ack设置为all,期间杀死topic的leader强制leader发生切换
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 9e5f2c18923c UP 172.17.0.3 NORMAL
kafka2 f6bc773f5325 UP 172.17.0.4 NORMAL
kafka3 3d6fa9e2182b UP 172.17.0.5 NORMAL
zk1 281f10b871ff UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:998903
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ concurrent-producer-silent.sh 100000 0.0001 all test1
Runs complete
Acknowledged total: 998833
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
在发生leader切换后kafka实际收到数据998903,客户端收到实际发送成功数据998833。kafka实际落地的数据大于返回客户端成功的数据,说明有70条数据虽然返回客户端失败,但仍然被写入kafka集群,没有发生数据丢失。
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景四
场景设定:
ACKS=1,即生产端只要接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间将topic的leader与kafka其他节点及zk进行网络隔离。原leader将不能与其他kafka节点及zk进行通讯,但是仍然可以和客户端进行通讯
操作过程:
客户端模拟向kafka发送100000条数据,ack设置为1,期间将topic的leader进行网络隔离
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 d8552041dd61 UP 172.17.0.3 NORMAL
kafka2 7a45eba05d55 UP 172.17.0.4 NORMAL
kafka3 029c6d8ad695 UP 172.17.0.5 NORMAL
zk1 6f3918e245ea UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
当前leader为3,对节点3进行网络隔离
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 d8552041dd61 UP 172.17.0.3 NORMAL 2
kafka2 7a45eba05d55 UP 172.17.0.4 NORMAL 2
kafka3 029c6d8ad695 UP 172.17.0.5 NORMAL 1
zk1 6f3918e245ea UP 172.17.0.2 NORMAL 2
被网络分区的节点029c6d8ad695已经不能与其他节点进行通讯
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.2"
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
^C
--- 172.17.0.2 ping statistics ---
4 packets transmitted, 0 received, 100% packet loss, time 3053ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.3"
PING 172.17.0.3 (172.17.0.3) 56(84) bytes of data.
^C
--- 172.17.0.3 ping statistics ---
4 packets transmitted, 0 received, 100% packet loss, time 3069ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.4"
PING 172.17.0.4 (172.17.0.4) 56(84) bytes of data.
^C
--- 172.17.0.4 ping statistics ---
6 packets transmitted, 0 received, 100% packet loss, time 5103ms
剩余节点已经发起重新选举,新的leader为1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2
从新leader处检查kafka集群实际收到的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:50740
查看原leader节点日志:
原leader kafka3已经不能与其他节点通讯,试图将ISR缩减到3,即自身,但由于不能与zk通信,所以不能成功,一直等待与zk的连接
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka3 | tail -50
[2021-11-30 08:33:03,949] WARN Client session timed out, have not heard from server in 19797ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:03,949] INFO Client session timed out, have not heard from server in 19797ms for sessionid 0x1000cfa943c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:05,713] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:17,254] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3. Leader: (highWatermark: 49804, endOffset: 100000). Out of sync replicas: (brokerId: 1, endOffset: 50740) (brokerId: 2, endOffset: 49804). (kafka.cluster.Partition)
[2021-11-30 08:33:22,056] WARN Client session timed out, have not heard from server in 18005ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:22,056] INFO Client session timed out, have not heard from server in 18005ms for sessionid 0x1000cfa943c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:22,157] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-11-30 08:33:23,200] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:41,205] WARN Client session timed out, have not heard from server in 19049ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
...
查看其他节点日志
从原leader kafka3同步数据出错,停止数据复制并将复制关系去除。重新选举并成为新leader,从offset 50740开始处理数据,选择周期进化到epoch 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka1 | tail -50
...
[2021-11-30 08:28:57,308] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka2:19092 (id: 2 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,827] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, bounced brokers: , all live brokers: 1,2 (kafka.controller.KafkaController)
[2021-11-30 08:32:50,828] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,829] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,829] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,832] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2021-11-30 08:32:50,851] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,852] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 1 from offset 50740 with high watermark 49804. Previous leader epoch was 0. (kafka.cluster.Partition)
[2021-11-30 08:32:50,858] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1277039458, epoch=11432) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,864] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,882] INFO [Controller id=1] Updated broker epochs cache: Map(1 -> 26, 2 -> 53) (kafka.controller.KafkaController)
[2021-11-30 08:32:56,368] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-11-30 08:32:56,369] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions triggered by AutoTriggered (kafka.controller.KafkaController)
[2021-11-30 08:37:51,249] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-11-30 08:37:56,370] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-11-30 08:37:56,370] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions triggered by AutoTriggered (kafka.controller.KafkaController)
停止从原leader broker 3复制数据,启动从新leader broker 1的offset49794开始复制数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka2 | tail -50
...
[2021-11-30 08:32:50,859] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=49794, leaderEpoch=1)) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,864] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,865] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,868] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1851347235, epoch=11294) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-11-30 08:32:50,871] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,871] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,883] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to 49804 has no effect as the largest offset in the log is 49803 (kafka.log.Log)
[2021-11-30 08:38:01,701] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
原topic leader broker 3在写入数据过程中发生网络隔离,但它并未意识到已经和其他节点及zk已经失联,仍然响应producer端的写入请求。producer端也没有收到写入失败的信息。最终kafka实际收到50740条数据,返回生产端写入失败0条,最终数据丢失(100000-50740-0=49260) 在leader broker发生网络隔离的情况下将导致大量数据丢失
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景五
场景设定:
ACKS=all,即生产端需要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间将topic的leader与kafka其他节点及zk进行网络隔离。原leader将不能与其他kafka节点及zk进行通讯,但是仍然可以和客户端进行通讯
操作过程:
客户端模拟向kafka发送100000条数据,ack设置为all,期间将topic的leader进行网络隔离
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 5c098b68a8d9 UP 172.17.0.3 NORMAL
kafka2 62a38ec7c939 UP 172.17.0.4 NORMAL
kafka3 f965e49b96bd UP 172.17.0.5 NORMAL
zk1 6a94658d45bf UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 5c098b68a8d9 UP 172.17.0.3 NORMAL 2
kafka2 62a38ec7c939 UP 172.17.0.4 NORMAL 2
kafka3 f965e49b96bd UP 172.17.0.5 NORMAL 1
zk1 6a94658d45bf UP 172.17.0.2 NORMAL 2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:36577
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka3 | tail -50
[2021-12-01 06:18:34,639] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:18:34,706] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-01 06:18:34,712] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 37 ms (kafka.log.Log)
[2021-12-01 06:18:34,719] INFO Created log for partition test1-0 in /var/lib/kafka/data/test1-0 with properties {} (kafka.log.LogManager)
[2021-12-01 06:18:34,719] INFO [Partition test1-0 broker=3] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2021-12-01 06:18:34,720] INFO [Partition test1-0 broker=3] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-01 06:18:34,722] INFO [Partition test1-0 broker=3] test1-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-01 06:18:35,814] INFO I/O exception (java.net.NoRouteToHostException) caught when processing request to {}->http://support-metrics.confluent.io:80: No route to host (Host unreachable) (org.apache.http.impl.execchain.RetryExec)
[2021-12-01 06:18:35,814] INFO Retrying request to {}->http://support-metrics.confluent.io:80 (org.apache.http.impl.execchain.RetryExec)
[2021-12-01 06:18:38,887] ERROR Could not submit metrics to Confluent: No route to host (Host unreachable) (io.confluent.support.metrics.utils.WebClient)
[2021-12-01 06:18:38,887] ERROR Failed to submit metrics to Confluent via insecure endpoint=http://support-metrics.confluent.io/anon -- giving up (io.confluent.support.metrics.submitters.ConfluentSubmitter)
[2021-12-01 06:20:04,723] WARN Client session timed out, have not heard from server in 12006ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:04,723] INFO Client session timed out, have not heard from server in 12006ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:06,760] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:24,778] WARN Client session timed out, have not heard from server in 19954ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:24,778] INFO Client session timed out, have not heard from server in 19954ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:26,534] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:28,772] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3. Leader: (highWatermark: 36577, endOffset: 36613). Out of sync replicas: (brokerId: 1, endOffset: 36577) (brokerId: 2, endOffset: 36597). (kafka.cluster.Partition)
[2021-12-01 06:20:42,894] WARN Client session timed out, have not heard from server in 18014ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:42,894] INFO Client session timed out, have not heard from server in 18014ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:42,996] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-01 06:20:44,577] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:02,592] WARN Client session timed out, have not heard from server in 19597ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:02,592] INFO Client session timed out, have not heard from server in 19597ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:03,809] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
...
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka1 | tail -50
...
[2021-12-01 06:20:12,383] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, bounced brokers: , all live brokers: 1,2 (kafka.controller.KafkaController)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,388] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2021-12-01 06:20:12,408] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,414] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 1 from offset 36577 with high watermark 36577. Previous leader epoch was 0. (kafka.cluster.Partition)
[2021-12-01 06:20:12,421] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,427] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1322674560, epoch=8051) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-01 06:20:12,428] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,428] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,436] INFO [Controller id=1] Updated broker epochs cache: Map(1 -> 26, 2 -> 53) (kafka.controller.KafkaController)
[2021-12-01 06:22:52,664] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-12-01 06:22:52,665] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions triggered by AutoTriggered (kafka.controller.KafkaController)
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka2 | tail -50
...
[2021-12-01 06:20:12,412] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,420] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=36577, leaderEpoch=1)) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,420] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,421] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,423] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=347527133, epoch=7998) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-01 06:20:12,426] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,437] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to offset 36577 (kafka.log.Log)
[2021-12-01 06:20:12,439] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Scheduling segments for deletion List() (kafka.log.Log)
[2021-12-01 06:20:12,440] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,444] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 36577 with message format version 2 (kafka.log.Log)
[2021-12-01 06:20:12,445] INFO [ProducerStateManager partition=test1-0] Writing producer snapshot at offset 36577 (kafka.log.ProducerStateManager)
[2021-12-01 06:21:12,440] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Deleting segments List() (kafka.log.Log)
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 36577 Failed: 63423
Sent: 100000
Delivered: 36577
Failed: 63423
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 2642be3c590c UP 172.17.0.3 NORMAL
kafka2 b226d706b431 UP 172.17.0.4 NORMAL
kafka3 6b225d0834be UP 172.17.0.5 NORMAL
zk1 dd0128271974 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 TopicId: nO2rXLtXQue3-tqS5skJiQ PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 2642be3c590c UP 172.17.0.3 NORMAL 1
kafka2 b226d706b431 UP 172.17.0.4 NORMAL 2
kafka3 6b225d0834be UP 172.17.0.5 NORMAL 2
zk1 dd0128271974 UP 172.17.0.2 NORMAL 2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 TopicId: nO2rXLtXQue3-tqS5skJiQ PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:89649
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60350ms, timeout #0)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60344ms, timeout #1)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60338ms, timeout #2)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60332ms, timeout #3)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60327ms, timeout #4)
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 59638 Failed: 10362
Success: 69638 Failed: 10362
Success: 79638 Failed: 10362
Success: 89638 Failed: 10362
Sent: 100000
Delivered: 89638
Failed: 10362
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
相对于场景四acks=1时,在发生leader被网络分区时,producer端会收到明确的发送失败信息
Producer端成功发送36577条数据,kafka端成功接收36577条数据,未发生数据丢失
可以看到在ACKS设置为all的情况下,无论leader是发生了节点故障还是网络分区而导致leader发生切换,都可以有效地防止由于leader切换而丢失数据
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
该版本在发生leader分区故障后进行切换的速度快于前面两个版本,因此客户端能够发送更多的数据到达kafka。其余与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景六
场景设定:
ACKS=1,即生产端只要接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间将topic的leader与zk间进行网络隔离。原leader将不能与zk进行通讯,但是仍然可以和客户端进行通讯
操作过程:
客户端模拟向kafka发送100000条数据,ack设置为1,期间将topic的leader与zk间进行网络隔离
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 b3d312073984 UP 172.17.0.3 NORMAL
kafka2 563f88a79dc1 UP 172.17.0.4 NORMAL
kafka3 5073f138ee27 UP 172.17.0.5 NORMAL
zk1 3101398f990c UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
当前leader为2,将网络划分为两个区域,一个区域为kafka1,kafka2,kafka3,另一个为kafka1,kafka3,zk1,因此leader除了不能与zk通讯外,其他通讯是正常的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1,kafka2,kafka3 kafka1,kafka3,zk1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 b3d312073984 UP 172.17.0.3 NORMAL 2
kafka2 563f88a79dc1 UP 172.17.0.4 NORMAL 1
kafka3 5073f138ee27 UP 172.17.0.5 NORMAL 3
zk1 3101398f990c UP 172.17.0.2 NORMAL 4
原leader被网络隔离后,仅与zookeeper失去联系
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.2"
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
^C
--- 172.17.0.2 ping statistics ---
6 packets transmitted, 0 received, 100% packet loss, time 5111ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.3"
PING 172.17.0.3 (172.17.0.3) 56(84) bytes of data.
64 bytes from 172.17.0.3: icmp_seq=1 ttl=64 time=0.049 ms
64 bytes from 172.17.0.3: icmp_seq=2 ttl=64 time=0.055 ms
64 bytes from 172.17.0.3: icmp_seq=3 ttl=64 time=0.056 ms
^C
--- 172.17.0.3 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2052ms
rtt min/avg/max/mdev = 0.049/0.053/0.056/0.006 ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.5"
PING 172.17.0.5 (172.17.0.5) 56(84) bytes of data.
64 bytes from 172.17.0.5: icmp_seq=1 ttl=64 time=0.056 ms
64 bytes from 172.17.0.5: icmp_seq=2 ttl=64 time=0.080 ms
64 bytes from 172.17.0.5: icmp_seq=3 ttl=64 time=0.109 ms
^C
--- 172.17.0.5 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2037ms
rtt min/avg/max/mdev = 0.056/0.081/0.109/0.024 ms
剩余的follower重新进行选举,新的leader为3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1
从新leader上检查kafka实际保存的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:78427
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
该场景和场景四类似,原leader在与zk脱离关系后,zk将会把原leader标记为离线,同时触发新的选举。原leader将仍然接收写请求,同时试图ISR缩减为自身,但由于不能与zk通讯,该操作不能成功。
区别是其他kafka节点停止向原leader发送fetch request的原因不是由于不能与原leader通讯,而是由于重新选出了新的leader
Producer端成功发送100000条数据,kafka端成功接收78427条数据,数据丢失21573
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景七
场景设定:
ACKS=all,即生产端只要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间将topic的leader与zk间进行网络隔离。
操作过程:
客户端模拟向kafka发送100000条数据,ack设置为all,期间将topic的leader与zk间进行网络隔离
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 825c158231f4 UP 172.17.0.3 NORMAL
kafka2 fe64d2227f35 UP 172.17.0.4 NORMAL
kafka3 4f57c56a3893 UP 172.17.0.5 NORMAL
zk1 bf63b423c3ea UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
当前leader为1,将网络划分为两个区域,一个区域为kafka1,kafka2,kafka3,另一个为kafka2,kafka3,zk1,因此leader除了不能与zk通讯外,其他通讯是正常的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1,kafka2,kafka3 kafka2,kafka3,zk1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 825c158231f4 UP 172.17.0.3 NORMAL 1
kafka2 fe64d2227f35 UP 172.17.0.4 NORMAL 2
kafka3 4f57c56a3893 UP 172.17.0.5 NORMAL 3
zk1 bf63b423c3ea UP 172.17.0.2 NORMAL 4
剩余的follower重新进行选举,新leader为3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2
从新leader上检查kafka实际存储的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:90057
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60167ms, timeout #0)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60162ms, timeout #1)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60156ms, timeout #2)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60150ms, timeout #3)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60144ms, timeout #4)
%4|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 29 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1638348015.783|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 29 request(s) timed out: disconnect (after 92087ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
Success: 70043 Failed: 9957
Success: 80043 Failed: 9957
Success: 90043 Failed: 9957
Sent: 100000
Delivered: 90043
Failed: 9957
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
该场景和场景五类似,原leader在与zk脱离关系后,zk将会把原leader标记为离线,同时触发新的选举。原leader将仍然接收写请求,同时试图ISR缩减为自身,但由于不能与zk通讯,该操作不能成功。
区别是其他kafka节点停止向原leader发送fetch request的原因不是由于不能通讯,而是由于重新选出了新的leader
Producer端成功发送90043条数据,kafka端成功接收90057条数据,未发生数据丢失
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
环境准备
测试前提
通过前面的测试我们已经知道当acks=0或者acks=1时都有可能导致写入kafka的数据丢失
如果发生unclean的leader切换也必然会导致数据丢失,详情可见:https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html
因此在后续的测试中我们统一设置:
acks: all
unclean.leader.election.enable: false(新版本缺省设置)
测试场景八
场景设定:
测试当acks=all,unclean.leader.election.enable=false,min.insync.replicas=1,所有follower由于慢导致被踢出ISR,acks退化为1的情况下,如果leader失效会出现的场景
操作过程:
客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换,之后恢复
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
leader为1,故将节点2、3设置为慢节点,模拟节点2、3处理缓慢,leader节点会与2、3节点间出现随机的网络延迟。leader与zk的通讯正常
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka2 kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.2"
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
64 bytes from 172.17.0.2: icmp_seq=1 ttl=64 time=0.047 ms
64 bytes from 172.17.0.2: icmp_seq=2 ttl=64 time=0.050 ms
64 bytes from 172.17.0.2: icmp_seq=3 ttl=64 time=0.053 ms
64 bytes from 172.17.0.2: icmp_seq=4 ttl=64 time=0.137 ms
64 bytes from 172.17.0.2: icmp_seq=5 ttl=64 time=0.053 ms
64 bytes from 172.17.0.2: icmp_seq=6 ttl=64 time=0.055 ms
64 bytes from 172.17.0.2: icmp_seq=7 ttl=64 time=0.057 ms
^C
--- 172.17.0.2 ping statistics ---
7 packets transmitted, 7 received, 0% packet loss, time 6132ms
rtt min/avg/max/mdev = 0.047/0.064/0.137/0.030 ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.4"
PING 172.17.0.4 (172.17.0.4) 56(84) bytes of data.
64 bytes from 172.17.0.4: icmp_seq=1 ttl=64 time=0.047 ms
64 bytes from 172.17.0.4: icmp_seq=2 ttl=64 time=1727 ms
64 bytes from 172.17.0.4: icmp_seq=4 ttl=64 time=1206 ms
64 bytes from 172.17.0.4: icmp_seq=5 ttl=64 time=359 ms
64 bytes from 172.17.0.4: icmp_seq=3 ttl=64 time=3054 ms
64 bytes from 172.17.0.4: icmp_seq=6 ttl=64 time=1432 ms
64 bytes from 172.17.0.4: icmp_seq=8 ttl=64 time=0.059 ms
64 bytes from 172.17.0.4: icmp_seq=7 ttl=64 time=1639 ms
64 bytes from 172.17.0.4: icmp_seq=9 ttl=64 time=873 ms
^C
--- 172.17.0.4 ping statistics ---
10 packets transmitted, 9 received, 10% packet loss, time 9056ms
rtt min/avg/max/mdev = 0.047/1143.632/3054.774/920.712 ms, pipe 4
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.5"
PING 172.17.0.5 (172.17.0.5) 56(84) bytes of data.
64 bytes from 172.17.0.5: icmp_seq=1 ttl=64 time=507 ms
64 bytes from 172.17.0.5: icmp_seq=3 ttl=64 time=0.054 ms
64 bytes from 172.17.0.5: icmp_seq=4 ttl=64 time=45.6 ms
64 bytes from 172.17.0.5: icmp_seq=2 ttl=64 time=2179 ms
64 bytes from 172.17.0.5: icmp_seq=5 ttl=64 time=0.058 ms
64 bytes from 172.17.0.5: icmp_seq=6 ttl=64 time=824 ms
64 bytes from 172.17.0.5: icmp_seq=7 ttl=64 time=17.7 ms
64 bytes from 172.17.0.5: icmp_seq=8 ttl=64 time=0.051 ms
64 bytes from 172.17.0.5: icmp_seq=9 ttl=64 time=215 ms
^C
--- 172.17.0.5 ping statistics ---
10 packets transmitted, 9 received, 10% packet loss, time 9066ms
rtt min/avg/max/mdev = 0.051/421.125/2179.973/678.343 ms, pipe 3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 SLOW
kafka3 dfe863b7190e UP 172.17.0.5 SLOW
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
观察当前的ISR状态,直到缩减为leader为止,期间客户端持续写入数据,直到出错退出
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka1 test1
...
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
^C
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
在ISR缩减为leader以后,这时客户端仍然可以正常写入数据,之后模拟leader宕机。
leader失效后,由于无法进行unclean的切换,无法进行正常的leader切换。新的leader无法选举出来,数据无法写入,也无法读取。leader显示为none,ISR维持原样,Producer端hung住一段时间后超时失败;
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Error: No such container: kafka-topics
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
[2021-12-04 02:12:32,587] WARN [AdminClient clientId=adminclient-1] Connection to node 1 (/172.17.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: none Replicas: 1,2,3 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: none Replicas: 1,2,3 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-controller.sh kafka2 test1
2
重新启动恢复原leader
原leader恢复后,已写入数据可以读取,数据没有丢失;如果leader不能恢复,则所有数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
update-hosts.sh
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 SLOW
kafka3 dfe863b7190e UP 172.17.0.5 SLOW
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:164834
恢复follower的网络。等所有follower网络恢复正常后,从leader同步数据,ISR恢复正常;
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka2 kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
再次模拟leader失效,这时可以进行正常leader切换,出现少量写入错误,leader切换完成后,写入正常,ISR去除失效节点,数据没有丢失;当前leader切换为2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b DOWN UNKNOWN
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3
原失效leader恢复后,能够自动加入ISR,从新leader同步数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
update-hosts.sh
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 161df85bc82b UP 172.17.0.3 NORMAL
kafka2 42d93122f5c8 UP 172.17.0.4 NORMAL
kafka3 dfe863b7190e UP 172.17.0.5 NORMAL
zk1 e4ac0f4e41c1 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3,1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:1164784
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
在节点2、3出现网络缓慢时的数据写入,后面超时退出
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60755ms, timeout #0)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60750ms, timeout #1)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60744ms, timeout #2)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60739ms, timeout #3)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60733ms, timeout #4)
%4|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 125 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
%3|1638583780.169|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 125 request(s) timed out: disconnect (after 77079ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... ...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 147662
Delivered: 37504
Failed: 10158
%4|1638583794.349|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (547663 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
在ISR缩减为leader时,模拟leader宕机时的数据接入,leader失效后不能再写入数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
%4|1638583911.382|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected (after 75954ms in state UP)
%3|1638583911.382|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: Connection refused (after 0ms in state CONNECT)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%3|1638583978.567|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: No route to host (after 66977ms in state CONNECT)
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 207350
Delivered: 107307
Failed: 43
%4|1638583985.412|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (600000 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
在所有网络恢复正常后,ISR恢复为3个节点后,模拟leader宕机,发生正常leader切换,期间只发生少量数据发送失败,没有发生数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%4|1638584478.892|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected (after 21710ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%6|1638584478.893|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
Success: 49935 Failed: 65
Success: 59935 Failed: 65
Success: 69935 Failed: 65
Success: 79935 Failed: 65
Success: 89935 Failed: 65
Success: 99935 Failed: 65
Success: 109935 Failed: 65
Success: 119935 Failed: 65
Success: 129935 Failed: 65
Success: 139935 Failed: 65
Success: 149935 Failed: 65
Success: 159935 Failed: 65
Success: 169935 Failed: 65
%3|1638584547.911|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: No route to host (after 68719ms in state CONNECT)
Success: 179935 Failed: 65
Success: 189935 Failed: 65
Success: 199935 Failed: 65
Success: 209935 Failed: 65
Success: 219935 Failed: 65
Success: 229935 Failed: 65
Success: 239935 Failed: 65
Success: 249935 Failed: 65
Success: 259935 Failed: 65
Success: 269935 Failed: 65
Success: 279935 Failed: 65
Success: 289935 Failed: 65
Success: 299935 Failed: 65
Success: 309935 Failed: 65
Success: 319935 Failed: 65
Success: 329935 Failed: 65
Success: 339935 Failed: 65
Success: 349935 Failed: 65
Success: 359935 Failed: 65
Success: 369935 Failed: 65
Success: 379935 Failed: 65
Success: 389935 Failed: 65
Success: 399935 Failed: 65
Success: 409935 Failed: 65
Success: 419935 Failed: 65
Success: 429935 Failed: 65
Success: 439935 Failed: 65
Success: 449935 Failed: 65
Success: 459935 Failed: 65
Success: 469935 Failed: 65
Success: 479935 Failed: 65
Success: 489935 Failed: 65
Success: 499935 Failed: 65
Success: 509935 Failed: 65
Success: 519935 Failed: 65
Success: 529935 Failed: 65
Success: 539935 Failed: 65
Success: 549935 Failed: 65
Success: 559935 Failed: 65
Success: 569935 Failed: 65
Success: 579935 Failed: 65
Success: 589935 Failed: 65
Success: 599935 Failed: 65
Success: 609935 Failed: 65
Success: 619935 Failed: 65
Success: 629935 Failed: 65
Success: 639935 Failed: 65
Success: 649935 Failed: 65
Success: 659935 Failed: 65
Success: 669935 Failed: 65
Success: 679935 Failed: 65
Success: 689935 Failed: 65
Success: 699935 Failed: 65
Success: 709935 Failed: 65
Success: 719935 Failed: 65
Success: 729935 Failed: 65
Success: 739935 Failed: 65
Success: 749935 Failed: 65
Success: 759935 Failed: 65
Success: 769935 Failed: 65
Success: 779935 Failed: 65
Success: 789935 Failed: 65
Success: 799935 Failed: 65
Success: 809935 Failed: 65
Success: 819935 Failed: 65
Success: 829935 Failed: 65
Success: 839935 Failed: 65
Success: 849935 Failed: 65
Success: 859935 Failed: 65
Success: 869935 Failed: 65
Success: 879935 Failed: 65
Success: 889935 Failed: 65
Success: 899935 Failed: 65
Success: 909935 Failed: 65
Success: 919935 Failed: 65
Success: 929935 Failed: 65
Success: 939935 Failed: 65
Success: 949935 Failed: 65
Success: 959935 Failed: 65
Success: 969935 Failed: 65
Success: 979935 Failed: 65
Success: 989935 Failed: 65
Success: 999935 Failed: 65
Sent: 1000000
Delivered: 999935
Failed: 65
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
当acks=all,unclean.leader.election.enable=false,min.insync.replicas=1,所有follower由于慢导致被踢出ISR,ISR缩减为leader,acks退化为1。acks在退化为1之前客户端写入失败,之后可以正常写入数据。
这时如果leader失效,由于设置unclean.leader.election.enable=false,故不会发生leader切换,数据将不能进行读写操作。
这时只能等待原leader能够恢复。如果原leader不能恢复将导致所有数据丢失。
原leader恢复以后可以正常读写数据,当网络慢的问题解决以后,leader再失效,可以发生正常切换,在原leader恢复后,也可以重新加入集群,不会发生数据丢失。
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景九
场景设定:
在场景八的基础上设置min.insync.replicas=2,确保在写入数据时至少有两个broker节点的数据是一致的
操作过程:
客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换,之后恢复
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic-min-insync.sh kafka1 test1 2
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b UP 172.17.0.4 NORMAL
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
当前leader为3,模拟节点1、2出现网络慢的故障,导致ISR缩减为leader
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka1 kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 SLOW
kafka2 7893d611b79b UP 172.17.0.4 SLOW
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka3 test1
...
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
^C
leader没有失效的时候,ISR缩减为只有leader,写入操作由于不满足min.insync.replicas=2而失败,出现错误code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"。之后模拟leader节点出现宕机,
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 SLOW
kafka2 7893d611b79b UP 172.17.0.4 SLOW
kafka3 4dfd87691f46 DOWN UNKNOWN
zk1 06b129f53213 UP 172.17.0.2 NORMAL
leader失效后,新的leader无法选举出来,数据无法写入,也无法读取,ISR维持原样,Producer端hung住一段时间后超时失败;恢复原leader
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 SLOW
kafka2 7893d611b79b UP 172.17.0.4 SLOW
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3
原leader恢复后,已写入数据恢复,数据没有丢失;如果leader不能恢复,所有数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:59063
所有follower网络恢复正常后,从leader同步数据,ISR恢复正常;由于ISR里面的节点数据超过2,因此可以正常写入数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka1 kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b UP 172.17.0.4 NORMAL
kafka3 4dfd87691f46 UP 172.17.0.5 NORMAL
zk1 06b129f53213 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
这时leader再失效,进行leader切换,出现少量写入错误,leader切换完成后,写入正常,ISR去除失效节点,数据没有丢失;
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b UP 172.17.0.4 NORMAL
kafka3 4dfd87691f46 DOWN UNKNOWN
zk1 06b129f53213 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 2 Replicas: 3,2,1 Isr: 2,1
切换后的leader再失效,ISR缩减为只有leader,写入操作数据由于不满足min.insync.replicas=2导致写入失败,出现错误code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"。这时只能读取数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3a0f0cb52624 UP 172.17.0.3 NORMAL
kafka2 7893d611b79b DOWN UNKNOWN
kafka3 4dfd87691f46 DOWN UNKNOWN
zk1 06b129f53213 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:209578
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60988ms, timeout #0)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60981ms, timeout #1)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60975ms, timeout #2)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60969ms, timeout #3)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60963ms, timeout #4)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
^CTraceback (most recent call last):
File "producer.py", line 68, in <module>
sleep(wait_period)
KeyboardInterrupt
Sent: 9566
Delivered: 0
Failed: 9556
%4|1638601352.387|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 10 messages (40 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%3|1638601448.775|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/bootstrap: Connect to ipv4#172.17.0.5:9092 failed: No route to host (after 18562ms in state CONNECT)
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 100000
Delivered: 0
Failed: 0
%4|1638601500.477|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (488895 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%4|1638601740.244|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected (after 22236ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
...
%6|1638601740.246|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
Success: 49949 Failed: 51
Success: 59949 Failed: 51
Success: 69949 Failed: 51
Success: 79949 Failed: 51
Success: 89949 Failed: 51
Success: 99949 Failed: 51
Success: 109949 Failed: 51
Success: 119949 Failed: 51
Success: 129949 Failed: 51
Success: 139949 Failed: 51
Success: 149949 Failed: 51
%4|1638601791.642|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Disconnected (after 31598ms in state UP)
%3|1638601791.643|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Connect to ipv4#172.17.0.4:9092 failed: Connection refused (after 0ms in state CONNECT)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
^CTraceback (most recent call last):
File "producer.py", line 68, in <module>
sleep(wait_period)
KeyboardInterrupt
Sent: 282974
Delivered: 150515
Failed: 132456
%4|1638601861.504|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 3 messages (18 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
这个场景与场景八类似,区别在于ISR里面broker节点数在低于2时不能正常写入数据,只能读取数据
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试场景十
场景设定:
在场景九的基础上,如果原leader磁盘出现故障不能恢复,在丢失全部数据的情况下重新加入集群
操作过程:
客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换。在原leader恢复的过程中模拟出现了不可恢复的磁盘故障,导致原leader上的数据全部丢失
具体命令:
测试版本:confluentinc/cp-kafka:5.5.6
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic-min-insync.sh kafka1 test1 2
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 UP 172.17.0.3 NORMAL
kafka2 931bd63b61e2 UP 172.17.0.4 NORMAL
kafka3 955172d1a496 UP 172.17.0.5 NORMAL
zk1 94404576cf08 UP 172.17.0.2 NORMAL
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
当前leader为1,kafka集群这时已经写入了大量数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:109498
模拟follower节点出现磁盘缓慢的故障
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka2 kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 UP 172.17.0.3 NORMAL
kafka2 931bd63b61e2 UP 172.17.0.4 SLOW
kafka3 955172d1a496 UP 172.17.0.5 SLOW
zk1 94404576cf08 UP 172.17.0.2 NORMAL
最终ISR缩减为只包含leader节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka1 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
...
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1
^C
这时在leader节点上是已经保存了大量数据的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:159391
模拟leader节点出现宕机故障,虽然follower节点上也保存了部分数据,但是不可用
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 DOWN UNKNOWN
kafka2 931bd63b61e2 UP 172.17.0.4 SLOW
kafka3 955172d1a496 UP 172.17.0.5 SLOW
zk1 94404576cf08 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
[2021-12-04 08:53:54,124] WARN [AdminClient clientId=adminclient-1] Connection to node 1 (/172.17.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: none Replicas: 1,2,3 Isr: 1
模拟原leader节点上出现了不可恢复的磁盘故障,所有数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ sudo rm -rf volumes/kafka/01/data/
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ mkdir -p volumes/kafka/01/data/
启动原leader节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 UP 172.17.0.3 NORMAL
kafka2 931bd63b61e2 UP 172.17.0.4 SLOW
kafka3 955172d1a496 UP 172.17.0.5 SLOW
zk1 94404576cf08 UP 172.17.0.2 NORMAL
原leader节点能够正常加入集群,但是集群原先保存的数据全部丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:0
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka2 kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:0
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 3d3dacd9b095 DOWN UNKNOWN
kafka2 931bd63b61e2 UP 172.17.0.4 NORMAL
kafka3 955172d1a496 UP 172.17.0.5 NORMAL
zk1 94404576cf08 UP 172.17.0.2 NORMAL
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: test1 Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 2,3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:0
原leader在丢失数据后,重启出现的日志:
由于不能找到以前的任何信息No checkpointed highwatermark is found for partition test1-0,因此Log loaded for partition test1-0 with initial high watermark 0然后将test1-0 starts at leader epoch 2 from offset 0 with high watermark 0
[2021-12-05 02:14:05,737] ERROR [Broker id=1] Received LeaderAndIsrRequest with correlation id 1 from controller 3 epoch 2 for partition test1-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:14:05,799] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-05 02:14:05,805] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 34 ms (kafka.log.Log)
[2021-12-05 02:14:05,810] INFO Created log for partition test1-0 in /var/lib/kafka/data/test1-0 with properties {min.insync.replicas=2} (kafka.log.LogManager)
[2021-12-05 02:14:05,811] INFO [Partition test1-0 broker=1] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,811] INFO [Partition test1-0 broker=1] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,811] ERROR [Broker id=1] Received LeaderAndIsrRequest with correlation id 1 from controller 3 epoch 2 for partition __confluent.support.metrics-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:14:05,815] INFO [Log partition=__confluent.support.metrics-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-05 02:14:05,815] INFO [Log partition=__confluent.support.metrics-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
[2021-12-05 02:14:05,816] INFO Created log for partition __confluent.support.metrics-0 in /var/lib/kafka/data/__confluent.support.metrics-0 with properties {retention.ms=31536000000} (kafka.log.LogManager)
[2021-12-05 02:14:05,816] INFO [Partition __confluent.support.metrics-0 broker=1] No checkpointed highwatermark is found for partition __confluent.support.metrics-0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,816] INFO [Partition __confluent.support.metrics-0 broker=1] Log loaded for partition __confluent.support.metrics-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-05 02:14:10,185] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0, __confluent.support.metrics-0) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:10,191] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 2 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-05 02:14:10,200] INFO [Partition __confluent.support.metrics-0 broker=1] __confluent.support.metrics-0 starts at leader epoch 2 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-05 02:14:11,394] INFO [Partition test1-0 broker=1] Expanding ISR from 1 to 1,2 (kafka.cluster.Partition)
[2021-12-05 02:14:11,401] INFO [Partition test1-0 broker=1] ISR updated to [1,2] and zkVersion updated to [4] (kafka.cluster.Partition)
[2021-12-05 02:14:14,459] INFO [Partition test1-0 broker=1] Expanding ISR from 1,2 to 1,2,3 (kafka.cluster.Partition)
[2021-12-05 02:14:14,462] INFO [Partition test1-0 broker=1] ISR updated to [1,2,3] and zkVersion updated to [5] (kafka.cluster.Partition)
follower上出现的日志:
在出现UNKNOWN_LEADER_EPOCH错误后,将本地数据Truncating to offset 0
[2021-12-05 02:13:42,140] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test1-0=(fetchOffset=57962, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=115803198, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:45,821] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Connection to node 1 (kafka1/172.17.0.3:19092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-12-05 02:13:45,822] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=115803198, epoch=INITIAL) to node 1: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:19092 (id: 1 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:45,822] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test1-0=(fetchOffset=57962, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=115803198, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:19092 (id: 1 rack: null) failed.
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:50,568] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:53,741] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:57,128] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:59,409] ERROR [Broker id=2] Received LeaderAndIsrRequest with correlation id 3 from controller 3 epoch 2 for partition test1-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:13:59,689] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:14:03,821] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:14:06,095] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:06,096] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=57962, leaderEpoch=2)) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:07,130] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test1-0 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
[2021-12-05 02:14:10,372] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test1-0 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
[2021-12-05 02:14:11,381] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to offset 0 (kafka.log.Log)
[2021-12-05 02:14:11,383] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Scheduling segments for deletion List() (kafka.log.Log)
[2021-12-05 02:14:11,390] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Success: 110000 Failed: 0
Success: 120000 Failed: 0
Success: 130000 Failed: 0
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 237429
Delivered: 137429
Failed: 0
%4|1638607813.104|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (600000 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%5|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60011ms, timeout #0)
%5|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60005ms, timeout #1)
%4|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
%3|1638607895.990|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 2 request(s) timed out: disconnect (after 60072ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Traceback (most recent call last):
File "producer.py", line 65, in <module>
p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 110645
Delivered: 1926
Failed: 8719
%4|1638607913.414|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (510646 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
测试版本:confluentinc/cp-kafka:6.2.1
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起
与confluentinc/cp-kafka:5.5.6执行结果无差异
窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行
与confluentinc/cp-kafka:5.5.6执行结果无差异
场景结论:
测试版本:confluentinc/cp-kafka:5.5.6
当ISR由于某种原因缩减为leader自身后(节点缓慢、网络缓慢、节点宕机等),如果这时leader再出现故障宕机,同时leader上的磁盘出现不可恢复的故障,丢失原有数据后再重新加入集群,这样将导致整个topic的数据全部丢失,即使其他的follower上保留有部分的数据
在早期的版本中,如果遇到这种情况,会导致整个集群崩溃。follower中会出现类似的日志:
log from kafka2
[2021-10-13 06:09:03,601] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 100955 (kafka.server.ReplicaFetcherThread)
log from kafka3
[2021-10-13 06:09:02,856] ERROR [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 100955 (kafka.server.ReplicaFetcherThread)
测试版本:confluentinc/cp-kafka:6.2.1
与confluentinc/cp-kafka:5.5.6测试结果无差异
测试版本:confluentinc/cp-kafka:7.0.0
与confluentinc/cp-kafka:5.5.6测试结果无差异
数据丢失的场景回顾
通过前面的测试,我们在这里总结一下会导致kafka丢失数据的情况:
当acks=1时发生leader的故障切换 即使在设置acks=all的情况下,如果允许unclean的故障切换(被选举为新leader的相关follower已经不在ISR列表中) 当acks=1时leader与zookeeper发生网络分区,形成短暂脑裂 在ISR已经缩减到leader自身的情况下,leader与其他kafka节点及zookeeper都发生隔离,即使将acks设置为all,原leader也会接收写入请求,导致数据丢失。特别是当min.insync.replicas=1的时候。尤其是这时如果leader发生了不可修复的磁盘故障导致,leader上的数据丢失,在空数据恢复leader后会最终导致整个topic的数据丢失 数据分区的所有节点同时出现故障。由于数据仅仅在内存中写入即被确认,这些数据很有可能没有被写入到磁盘中。当数据节点恢复的时候将导致数据的丢失。
要避免上述情况,可以通过一些设置来一定程度上解决。Unclean的故障切换可以通过设置unclean.leader.election.enable=false来避免,或者是通过设置min.insync.replicas大于等于2确保数据冗余度至少是2来避免。在需要保证高可靠的环境中通常设置acks=all,同时min.insync.replicas大于1。但这样做的代价是:
原题:Kafka 容错及高可用测试一;Kafka 容错及高可用测试一 / via 民生运维人 觉得本文有用,请转发或点击“在看”,让更多同行看到
欢迎关注社区 “Kafka”技术主题 ,将会不断更新优质资料、文章。地址:
https://www.talkwithtrend.com/Topic/111827/doc
长按二维码关注公众号

*本公众号所发布内容仅代表作者观点,不代表社区立场






