暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka 容错及高可用测试 | 运维进阶

twt企业IT社区 2022-01-26
2948

【作者】詹玉林 中国民生银行信息科技部开源软件支持组工程师

关联阅读:Kafka 容错及高可用原理


环境准备

  • 准备测试所需的docker image

    在测试中将使用confluent三个主要系列的最新版本进行测试,检查不同版本在相同测试场景下的异同

    • docker pull confluentinc/cp-zookeeper:7.0.0

    • docker pull confluentinc/cp-kafka:5.5.6

      • 对应Apache Kafka® 2.5.1

    • docker pull confluentinc/cp-kafka:6.2.1

      • 对应Apache Kafka® 2.8.0

    • docker pull confluentinc/cp-kafka:7.0.0

      • 对应Apache Kafka® 3.0.0

  • 准备搭建测试环境的工具

    • 安装测试工具blockade,详情可见:https://blockade.readthedocs.io/en/latest/install.html

  • 准备测试脚本

    https://github.com/zhan-yl/ChaosTestingCode.git

  • 测试准备

    • 数据丢失的定义:

      在生产端已经收到kafka端返回确认的情况下,生产端认为数据已经被kafka写入成功,不会再对这部分数据发起重试操作,但这时由于kafka端某种异常导致这部分已经返回过确认的数据无法找回的情况(非所有数据副本发生不可修复故障),定义为我们测试中的数据丢失

    • 测试方法:

      客户端:统计已成功发送到kafka端并获得成功确认的数据量

      kafka端:每次新建单分区topic,然后通过获取kafka.tools.GetOffsetShell即可知道kafka实际存储的数据量

      测试过程:在客户端写入数据的过程中模拟制造各种生产故障

      比对:如果客户端的成功发送数据量大于kafka实际存储的数据量,则发生数据丢失

    • 测试操作流程

      每次测试时重新初始化整个集群,集群中通过容器建立kafka、zookeeper节点,集群创建完毕后创建kafka的单分区topic test1。在多个不同kafka版本间进行对比测试

      在一个窗口持续模拟向kafka的topic写入大量数据

      在另一个窗口对集群的broker节点模拟产生各种故障:节点宕机、网络缓慢、网络分区等

      最终检查kafka集群中实际写入的数据与预期是否相符

    • 测试前提:

      在测试中我们没有对发送失败的数据进行重试操作,这是为了避免对测试结果统计造成影响,但是数据重试在生产操作中是必须有的环节

    • 第一次启动环境:

      blockade up

    • 测试过程中zookeeper、kafka的数据存储均使用本地存储,保存在~/ChaosTestingCode/Kafka/cluster/volumes/


测试场景一

场景设定:

ACKS=0,即数据生产客户端并不需等待kafka端是否返回ACK即认为数据已经发送成功,期间topic的leader发生故障切换。

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为0,期间杀死topic的leader强制leader发生切换

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5da02d8887d5    UP      172.17.0.3      NORMAL                
kafka2          34238c04862a    UP      172.17.0.4      NORMAL                
kafka3          fd202d94bd05    UP      172.17.0.5      NORMAL                
zk1             61fc580b7a7e    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        
当前leader为2节点,模拟节点2出现宕机
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5da02d8887d5    UP      172.17.0.3      NORMAL                
kafka2          34238c04862a    DOWN                    UNKNOWN               
kafka3          fd202d94bd05    UP      172.17.0.5      NORMAL                
zk1             61fc580b7a7e    UP      172.17.0.2      NORMAL             

新leader切换到3节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3,1
        
检查kefka实际存储数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:99947

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 0 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%6|1638169471.598|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Disconnected (after 19087ms in state UP)
%3|1638169471.598|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Connect to ipv4#172.17.0.4:9092 failed: Connection refused (after 0ms in state CONNECT)
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    由于网络连接失败及发生leader切换,客户端发送100000条数据没有失败,kafka中实际写入99947条数据,因此导致53条数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景二

场景设定:

ACKS=1,即数据生产客户端只需接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间topic的leader发生故障切换。

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为1,期间杀死topic的leader强制leader发生切换

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          aa0f36c550e6    UP      172.17.0.3      NORMAL                
kafka2          7eaea8c1d88f    UP      172.17.0.4      NORMAL                
kafka3          d1489c809d78    UP      172.17.0.5      NORMAL                
zk1             1d533d285fec    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        
当前leader为3,模拟节点3出现宕机
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          aa0f36c550e6    UP      172.17.0.3      NORMAL                
kafka2          7eaea8c1d88f    UP      172.17.0.4      NORMAL                
kafka3          d1489c809d78    DOWN                    UNKNOWN               
zk1             1d533d285fec    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,1,2 Isr: 1,2
        
检查kafka实际存储的数据量
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:99944

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%4|1638172063.335|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected (after 17386ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%6|1638172063.336|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
Success: 39944 Failed: 56
Success: 49944 Failed: 56
Success: 59944 Failed: 56
Success: 69944 Failed: 56
Success: 79944 Failed: 56
Success: 89944 Failed: 56
Success: 99944 Failed: 56
Sent: 100000
Delivered: 99944
Failed: 56

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    由于网络连接失败及发生leader切换,客户端发送100000条数据,其中56条数据发送失败,kafka中实际写入99944条数据,没有发生数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景三

场景设定:

ACKS=all,即数据生产客户端需要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间topic的leader发生故障切换。

操作过程:

客户端模拟向kafka发送1000000条数据,以十个并发进程同时进行,ack设置为all,期间杀死topic的leader强制leader发生切换

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          9e5f2c18923c    UP      172.17.0.3      NORMAL                
kafka2          f6bc773f5325    UP      172.17.0.4      NORMAL                
kafka3          3d6fa9e2182b    UP      172.17.0.5      NORMAL                
zk1             281f10b871ff    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 1,3,2 Isr: 3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:998903

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ concurrent-producer-silent.sh 100000 0.0001 all test1    
Runs complete
Acknowledged total: 998833

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    在发生leader切换后kafka实际收到数据998903,客户端收到实际发送成功数据998833。kafka实际落地的数据大于返回客户端成功的数据,说明有70条数据虽然返回客户端失败,但仍然被写入kafka集群,没有发生数据丢失。

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景四

场景设定:

ACKS=1,即生产端只要接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间将topic的leader与kafka其他节点及zk进行网络隔离。原leader将不能与其他kafka节点及zk进行通讯,但是仍然可以和客户端进行通讯

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为1,期间将topic的leader进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          d8552041dd61    UP      172.17.0.3      NORMAL                
kafka2          7a45eba05d55    UP      172.17.0.4      NORMAL                
kafka3          029c6d8ad695    UP      172.17.0.5      NORMAL                
zk1             6f3918e245ea    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        
当前leader为3,对节点3进行网络隔离
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          d8552041dd61    UP      172.17.0.3      NORMAL     2          
kafka2          7a45eba05d55    UP      172.17.0.4      NORMAL     2          
kafka3          029c6d8ad695    UP      172.17.0.5      NORMAL     1          
zk1             6f3918e245ea    UP      172.17.0.2      NORMAL     2

被网络分区的节点029c6d8ad695已经不能与其他节点进行通讯
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.2"            
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
^C
--- 172.17.0.2 ping statistics ---
4 packets transmitted, 0 received, 100% packet loss, time 3053ms

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.3"
PING 172.17.0.3 (172.17.0.3) 56(84) bytes of data.
^C
--- 172.17.0.3 ping statistics ---
4 packets transmitted, 0 received, 100% packet loss, time 3069ms

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.4"
PING 172.17.0.4 (172.17.0.4) 56(84) bytes of data.
^C
--- 172.17.0.4 ping statistics ---
6 packets transmitted, 0 received, 100% packet loss, time 5103ms

剩余节点已经发起重新选举,新的leader为1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,1,2 Isr: 1,2
        
从新leader处检查kafka集群实际收到的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:50740

查看原leader节点日志:
原leader kafka3已经不能与其他节点通讯,试图将ISR缩减到3,即自身,但由于不能与zk通信,所以不能成功,一直等待与zk的连接
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka3 | tail -50
[2021-11-30 08:33:03,949] WARN Client session timed out, have not heard from server in 19797ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:03,949] INFO Client session timed out, have not heard from server in 19797ms for sessionid 0x1000cfa943c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:05,713] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:17,254] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3. Leader: (highWatermark: 49804, endOffset: 100000). Out of sync replicas: (brokerId: 1, endOffset: 50740) (brokerId: 2, endOffset: 49804). (kafka.cluster.Partition)
[2021-11-30 08:33:22,056] WARN Client session timed out, have not heard from server in 18005ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:22,056] INFO Client session timed out, have not heard from server in 18005ms for sessionid 0x1000cfa943c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:22,157] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-11-30 08:33:23,200] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:41,205] WARN Client session timed out, have not heard from server in 19049ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
...

查看其他节点日志
从原leader kafka3同步数据出错,停止数据复制并将复制关系去除。重新选举并成为新leader,从offset 50740开始处理数据,选择周期进化到epoch 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka1 | tail -50
 ...
[2021-11-30 08:28:57,308] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka2:19092 (id: 2 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,827] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, bounced brokers: , all live brokers: 1,2 (kafka.controller.KafkaController)
[2021-11-30 08:32:50,828] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,829] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,829] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,832] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2021-11-30 08:32:50,851] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,852] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 1 from offset 50740 with high watermark 49804. Previous leader epoch was 0. (kafka.cluster.Partition)
[2021-11-30 08:32:50,858] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1277039458, epoch=11432) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,864] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,882] INFO [Controller id=1] Updated broker epochs cache: Map(1 -> 26, 2 -> 53) (kafka.controller.KafkaController)
[2021-11-30 08:32:56,368] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-11-30 08:32:56,369] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)
[2021-11-30 08:37:51,249] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-11-30 08:37:56,370] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-11-30 08:37:56,370] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)

停止从原leader broker 3复制数据,启动从新leader broker 1的offset49794开始复制数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka2 | tail -50
...
[2021-11-30 08:32:50,859] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=49794, leaderEpoch=1)) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,864] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,865] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,868] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1851347235, epoch=11294) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-11-30 08:32:50,871] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,871] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,883] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to 49804 has no effect as the largest offset in the log is 49803 (kafka.log.Log)
[2021-11-30 08:38:01,701] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    原topic leader broker 3在写入数据过程中发生网络隔离,但它并未意识到已经和其他节点及zk已经失联,仍然响应producer端的写入请求。producer端也没有收到写入失败的信息。最终kafka实际收到50740条数据,返回生产端写入失败0条,最终数据丢失(100000-50740-0=49260) 在leader broker发生网络隔离的情况下将导致大量数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景五

场景设定:

ACKS=all,即生产端需要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间将topic的leader与kafka其他节点及zk进行网络隔离。原leader将不能与其他kafka节点及zk进行通讯,但是仍然可以和客户端进行通讯

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为all,期间将topic的leader进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5c098b68a8d9    UP      172.17.0.3      NORMAL                
kafka2          62a38ec7c939    UP      172.17.0.4      NORMAL                
kafka3          f965e49b96bd    UP      172.17.0.5      NORMAL                
zk1             6a94658d45bf    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5c098b68a8d9    UP      172.17.0.3      NORMAL     2          
kafka2          62a38ec7c939    UP      172.17.0.4      NORMAL     2          
kafka3          f965e49b96bd    UP      172.17.0.5      NORMAL     1          
zk1             6a94658d45bf    UP      172.17.0.2      NORMAL     2          
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,1,2 Isr: 1,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:36577

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka3 | tail -50
[2021-12-01 06:18:34,639] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:18:34,706] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-01 06:18:34,712] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 37 ms (kafka.log.Log)
[2021-12-01 06:18:34,719] INFO Created log for partition test1-0 in /var/lib/kafka/data/test1-0 with properties {} (kafka.log.LogManager)
[2021-12-01 06:18:34,719] INFO [Partition test1-0 broker=3] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2021-12-01 06:18:34,720] INFO [Partition test1-0 broker=3] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-01 06:18:34,722] INFO [Partition test1-0 broker=3] test1-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-01 06:18:35,814] INFO I/O exception (java.net.NoRouteToHostException) caught when processing request to {}->http://support-metrics.confluent.io:80: No route to host (Host unreachable) (org.apache.http.impl.execchain.RetryExec)
[2021-12-01 06:18:35,814] INFO Retrying request to {}->http://support-metrics.confluent.io:80 (org.apache.http.impl.execchain.RetryExec)
[2021-12-01 06:18:38,887] ERROR Could not submit metrics to Confluent: No route to host (Host unreachable) (io.confluent.support.metrics.utils.WebClient)
[2021-12-01 06:18:38,887] ERROR Failed to submit metrics to Confluent via insecure endpoint=http://support-metrics.confluent.io/anon -- giving up (io.confluent.support.metrics.submitters.ConfluentSubmitter)
[2021-12-01 06:20:04,723] WARN Client session timed out, have not heard from server in 12006ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:04,723] INFO Client session timed out, have not heard from server in 12006ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:06,760] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:24,778] WARN Client session timed out, have not heard from server in 19954ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:24,778] INFO Client session timed out, have not heard from server in 19954ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:26,534] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:28,772] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3. Leader: (highWatermark: 36577, endOffset: 36613). Out of sync replicas: (brokerId: 1, endOffset: 36577) (brokerId: 2, endOffset: 36597). (kafka.cluster.Partition)
[2021-12-01 06:20:42,894] WARN Client session timed out, have not heard from server in 18014ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:42,894] INFO Client session timed out, have not heard from server in 18014ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:42,996] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-01 06:20:44,577] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:02,592] WARN Client session timed out, have not heard from server in 19597ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:02,592] INFO Client session timed out, have not heard from server in 19597ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:03,809] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
...

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka1 | tail -50
...
[2021-12-01 06:20:12,383] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, bounced brokers: , all live brokers: 1,2 (kafka.controller.KafkaController)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,388] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2021-12-01 06:20:12,408] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,414] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 1 from offset 36577 with high watermark 36577. Previous leader epoch was 0. (kafka.cluster.Partition)
[2021-12-01 06:20:12,421] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,427] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1322674560, epoch=8051) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-01 06:20:12,428] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,428] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,436] INFO [Controller id=1] Updated broker epochs cache: Map(1 -> 26, 2 -> 53) (kafka.controller.KafkaController)
[2021-12-01 06:22:52,664] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-12-01 06:22:52,665] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka2 | tail -50
...
[2021-12-01 06:20:12,412] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,420] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=36577, leaderEpoch=1)) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,420] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,421] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,423] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=347527133, epoch=7998) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-01 06:20:12,426] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,437] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to offset 36577 (kafka.log.Log)
[2021-12-01 06:20:12,439] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Scheduling segments for deletion List() (kafka.log.Log)
[2021-12-01 06:20:12,440] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,444] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 36577 with message format version 2 (kafka.log.Log)
[2021-12-01 06:20:12,445] INFO [ProducerStateManager partition=test1-0] Writing producer snapshot at offset 36577 (kafka.log.ProducerStateManager)
[2021-12-01 06:21:12,440] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Deleting segments List() (kafka.log.Log)

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 36577 Failed: 63423
Sent: 100000
Delivered: 36577
Failed: 63423

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          2642be3c590c    UP      172.17.0.3      NORMAL                
kafka2          b226d706b431    UP      172.17.0.4      NORMAL                
kafka3          6b225d0834be    UP      172.17.0.5      NORMAL                
zk1             dd0128271974    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    TopicId: nO2rXLtXQue3-tqS5skJiQ PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          2642be3c590c    UP      172.17.0.3      NORMAL     1          
kafka2          b226d706b431    UP      172.17.0.4      NORMAL     2          
kafka3          6b225d0834be    UP      172.17.0.5      NORMAL     2          
zk1             dd0128271974    UP      172.17.0.2      NORMAL     2          
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1    TopicId: nO2rXLtXQue3-tqS5skJiQ PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 1,3,2 Isr: 3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:89649

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60350ms, timeout #0)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60344ms, timeout #1)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60338ms, timeout #2)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60332ms, timeout #3)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60327ms, timeout #4)
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 59638 Failed: 10362
Success: 69638 Failed: 10362
Success: 79638 Failed: 10362
Success: 89638 Failed: 10362
Sent: 100000
Delivered: 89638
Failed: 10362

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    相对于场景四acks=1时,在发生leader被网络分区时,producer端会收到明确的发送失败信息

    Producer端成功发送36577条数据,kafka端成功接收36577条数据,未发生数据丢失

    可以看到在ACKS设置为all的情况下,无论leader是发生了节点故障还是网络分区而导致leader发生切换,都可以有效地防止由于leader切换而丢失数据

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    该版本在发生leader分区故障后进行切换的速度快于前面两个版本,因此客户端能够发送更多的数据到达kafka。其余与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景六

场景设定:

ACKS=1,即生产端只要接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间将topic的leader与zk间进行网络隔离。原leader将不能与zk进行通讯,但是仍然可以和客户端进行通讯

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为1,期间将topic的leader与zk间进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          b3d312073984    UP      172.17.0.3      NORMAL                
kafka2          563f88a79dc1    UP      172.17.0.4      NORMAL                
kafka3          5073f138ee27    UP      172.17.0.5      NORMAL                
zk1             3101398f990c    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        
当前leader为2,将网络划分为两个区域,一个区域为kafka1,kafka2,kafka3,另一个为kafka1,kafka3,zk1,因此leader除了不能与zk通讯外,其他通讯是正常的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1,kafka2,kafka3 kafka1,kafka3,zk1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          b3d312073984    UP      172.17.0.3      NORMAL     2          
kafka2          563f88a79dc1    UP      172.17.0.4      NORMAL     1          
kafka3          5073f138ee27    UP      172.17.0.5      NORMAL     3          
zk1             3101398f990c    UP      172.17.0.2      NORMAL     4   

原leader被网络隔离后,仅与zookeeper失去联系
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.2"            
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
^C
--- 172.17.0.2 ping statistics ---
6 packets transmitted, 0 received, 100% packet loss, time 5111ms

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.3"
PING 172.17.0.3 (172.17.0.3) 56(84) bytes of data.
64 bytes from 172.17.0.3: icmp_seq=1 ttl=64 time=0.049 ms
64 bytes from 172.17.0.3: icmp_seq=2 ttl=64 time=0.055 ms
64 bytes from 172.17.0.3: icmp_seq=3 ttl=64 time=0.056 ms
^C
--- 172.17.0.3 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2052ms
rtt min/avg/max/mdev = 0.049/0.053/0.056/0.006 ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.5"
PING 172.17.0.5 (172.17.0.5) 56(84) bytes of data.
64 bytes from 172.17.0.5: icmp_seq=1 ttl=64 time=0.056 ms
64 bytes from 172.17.0.5: icmp_seq=2 ttl=64 time=0.080 ms
64 bytes from 172.17.0.5: icmp_seq=3 ttl=64 time=0.109 ms
^C
--- 172.17.0.5 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2037ms
rtt min/avg/max/mdev = 0.056/0.081/0.109/0.024 ms

剩余的follower重新进行选举,新的leader为3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3,1
      
从新leader上检查kafka实际保存的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:78427

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1  
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    该场景和场景四类似,原leader在与zk脱离关系后,zk将会把原leader标记为离线,同时触发新的选举。原leader将仍然接收写请求,同时试图ISR缩减为自身,但由于不能与zk通讯,该操作不能成功。

    区别是其他kafka节点停止向原leader发送fetch request的原因不是由于不能与原leader通讯,而是由于重新选出了新的leader

    Producer端成功发送100000条数据,kafka端成功接收78427条数据,数据丢失21573

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景七

场景设定:

ACKS=all,即生产端只要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间将topic的leader与zk间进行网络隔离。

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为all,期间将topic的leader与zk间进行网络隔离

具体命令:

  • 测试版本confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          825c158231f4    UP      172.17.0.3      NORMAL                
kafka2          fe64d2227f35    UP      172.17.0.4      NORMAL                
kafka3          4f57c56a3893    UP      172.17.0.5      NORMAL                
zk1             bf63b423c3ea    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
        
当前leader为1,将网络划分为两个区域,一个区域为kafka1,kafka2,kafka3,另一个为kafka2,kafka3,zk1,因此leader除了不能与zk通讯外,其他通讯是正常的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1,kafka2,kafka3 kafka2,kafka3,zk1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          825c158231f4    UP      172.17.0.3      NORMAL     1          
kafka2          fe64d2227f35    UP      172.17.0.4      NORMAL     2          
kafka3          4f57c56a3893    UP      172.17.0.5      NORMAL     3          
zk1             bf63b423c3ea    UP      172.17.0.2      NORMAL     4  

剩余的follower重新进行选举,新leader为3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 1,3,2 Isr: 3,2
     
从新leader上检查kafka实际存储的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:90057

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60167ms, timeout #0)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60162ms, timeout #1)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60156ms, timeout #2)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60150ms, timeout #3)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60144ms, timeout #4)
%4|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 29 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1638348015.783|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 29 request(s) timed out: disconnect (after 92087ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
Success: 70043 Failed: 9957
Success: 80043 Failed: 9957
Success: 90043 Failed: 9957
Sent: 100000
Delivered: 90043
Failed: 9957

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    该场景和场景五类似,原leader在与zk脱离关系后,zk将会把原leader标记为离线,同时触发新的选举。原leader将仍然接收写请求,同时试图ISR缩减为自身,但由于不能与zk通讯,该操作不能成功。

    区别是其他kafka节点停止向原leader发送fetch request的原因不是由于不能通讯,而是由于重新选出了新的leader

    Producer端成功发送90043条数据,kafka端成功接收90057条数据,未发生数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


环境准备

  • 测试前提

    通过前面的测试我们已经知道当acks=0或者acks=1时都有可能导致写入kafka的数据丢失

    如果发生unclean的leader切换也必然会导致数据丢失,详情可见:https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html

    因此在后续的测试中我们统一设置:

    • acks: all

    • unclean.leader.election.enable: false(新版本缺省设置)


测试场景八

场景设定:

测试当acks=all,unclean.leader.election.enable=false,min.insync.replicas=1,所有follower由于慢导致被踢出ISR,acks退化为1的情况下,如果leader失效会出现的场景

操作过程:

客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换,之后恢复

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          161df85bc82b    UP      172.17.0.3      NORMAL                
kafka2          42d93122f5c8    UP      172.17.0.4      NORMAL                
kafka3          dfe863b7190e    UP      172.17.0.5      NORMAL                
zk1             e4ac0f4e41c1    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

leader为1,故将节点2、3设置为慢节点,模拟节点2、3处理缓慢,leader节点会与2、3节点间出现随机的网络延迟。leader与zk的通讯正常
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka2 kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.2"            
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
64 bytes from 172.17.0.2: icmp_seq=1 ttl=64 time=0.047 ms
64 bytes from 172.17.0.2: icmp_seq=2 ttl=64 time=0.050 ms
64 bytes from 172.17.0.2: icmp_seq=3 ttl=64 time=0.053 ms
64 bytes from 172.17.0.2: icmp_seq=4 ttl=64 time=0.137 ms
64 bytes from 172.17.0.2: icmp_seq=5 ttl=64 time=0.053 ms
64 bytes from 172.17.0.2: icmp_seq=6 ttl=64 time=0.055 ms
64 bytes from 172.17.0.2: icmp_seq=7 ttl=64 time=0.057 ms
^C
--- 172.17.0.2 ping statistics ---
7 packets transmitted, 7 received, 0% packet loss, time 6132ms
rtt min/avg/max/mdev = 0.047/0.064/0.137/0.030 ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.4"
PING 172.17.0.4 (172.17.0.4) 56(84) bytes of data.
64 bytes from 172.17.0.4: icmp_seq=1 ttl=64 time=0.047 ms
64 bytes from 172.17.0.4: icmp_seq=2 ttl=64 time=1727 ms
64 bytes from 172.17.0.4: icmp_seq=4 ttl=64 time=1206 ms
64 bytes from 172.17.0.4: icmp_seq=5 ttl=64 time=359 ms
64 bytes from 172.17.0.4: icmp_seq=3 ttl=64 time=3054 ms
64 bytes from 172.17.0.4: icmp_seq=6 ttl=64 time=1432 ms
64 bytes from 172.17.0.4: icmp_seq=8 ttl=64 time=0.059 ms
64 bytes from 172.17.0.4: icmp_seq=7 ttl=64 time=1639 ms
64 bytes from 172.17.0.4: icmp_seq=9 ttl=64 time=873 ms
^C
--- 172.17.0.4 ping statistics ---
10 packets transmitted, 9 received, 10% packet loss, time 9056ms
rtt min/avg/max/mdev = 0.047/1143.632/3054.774/920.712 ms, pipe 4
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 161df85bc82b bash -c "ping 172.17.0.5"
PING 172.17.0.5 (172.17.0.5) 56(84) bytes of data.
64 bytes from 172.17.0.5: icmp_seq=1 ttl=64 time=507 ms
64 bytes from 172.17.0.5: icmp_seq=3 ttl=64 time=0.054 ms
64 bytes from 172.17.0.5: icmp_seq=4 ttl=64 time=45.6 ms
64 bytes from 172.17.0.5: icmp_seq=2 ttl=64 time=2179 ms
64 bytes from 172.17.0.5: icmp_seq=5 ttl=64 time=0.058 ms
64 bytes from 172.17.0.5: icmp_seq=6 ttl=64 time=824 ms
64 bytes from 172.17.0.5: icmp_seq=7 ttl=64 time=17.7 ms
64 bytes from 172.17.0.5: icmp_seq=8 ttl=64 time=0.051 ms
64 bytes from 172.17.0.5: icmp_seq=9 ttl=64 time=215 ms
^C
--- 172.17.0.5 ping statistics ---
10 packets transmitted, 9 received, 10% packet loss, time 9066ms
rtt min/avg/max/mdev = 0.051/421.125/2179.973/678.343 ms, pipe 3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          161df85bc82b    UP      172.17.0.3      NORMAL                
kafka2          42d93122f5c8    UP      172.17.0.4      SLOW                  
kafka3          dfe863b7190e    UP      172.17.0.5      SLOW                  
zk1             e4ac0f4e41c1    UP      172.17.0.2      NORMAL                

观察当前的ISR状态,直到缩减为leader为止,期间客户端持续写入数据,直到出错退出
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka1 test1
...
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
^C
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
        
在ISR缩减为leader以后,这时客户端仍然可以正常写入数据,之后模拟leader宕机。
leader失效后,由于无法进行unclean的切换,无法进行正常的leader切换。新的leader无法选举出来,数据无法写入,也无法读取。leader显示为none,ISR维持原样,Producer端hung住一段时间后超时失败;
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Error: No such container: kafka-topics
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
[2021-12-04 02:12:32,587] WARN [AdminClient clientId=adminclient-1] Connection to node 1 (/172.17.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: none    Replicas: 1,2,3 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka3 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: none    Replicas: 1,2,3 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-controller.sh kafka2 test1
2

重新启动恢复原leader
原leader恢复后,已写入数据可以读取,数据没有丢失;如果leader不能恢复,则所有数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
update-hosts.sh
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          161df85bc82b    UP      172.17.0.3      NORMAL                
kafka2          42d93122f5c8    UP      172.17.0.4      SLOW                  
kafka3          dfe863b7190e    UP      172.17.0.5      SLOW                  
zk1             e4ac0f4e41c1    UP      172.17.0.2      NORMAL             
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:164834

恢复follower的网络。等所有follower网络恢复正常后,从leader同步数据,ISR恢复正常;
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka2 kafka3   
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          161df85bc82b    UP      172.17.0.3      NORMAL                
kafka2          42d93122f5c8    UP      172.17.0.4      NORMAL                
kafka3          dfe863b7190e    UP      172.17.0.5      NORMAL                
zk1             e4ac0f4e41c1    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        
再次模拟leader失效,这时可以进行正常leader切换,出现少量写入错误,leader切换完成后,写入正常,ISR去除失效节点,数据没有丢失;当前leader切换为2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          161df85bc82b    DOWN                    UNKNOWN               
kafka2          42d93122f5c8    UP      172.17.0.4      NORMAL                
kafka3          dfe863b7190e    UP      172.17.0.5      NORMAL                
zk1             e4ac0f4e41c1    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 2,3
        
原失效leader恢复后,能够自动加入ISR,从新leader同步数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
update-hosts.sh
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          161df85bc82b    UP      172.17.0.3      NORMAL                
kafka2          42d93122f5c8    UP      172.17.0.4      NORMAL                
kafka3          dfe863b7190e    UP      172.17.0.5      NORMAL                
zk1             e4ac0f4e41c1    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 2,3,1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:1164784

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

在节点2、3出现网络缓慢时的数据写入,后面超时退出
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60755ms, timeout #0)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60750ms, timeout #1)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60744ms, timeout #2)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60739ms, timeout #3)
%5|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60733ms, timeout #4)
%4|1638583780.169|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 125 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
%3|1638583780.169|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 125 request(s) timed out: disconnect (after 77079ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... ...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Traceback (most recent call last):
  File "producer.py", line 65, in <module>
    p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 147662
Delivered: 37504
Failed: 10158
%4|1638583794.349|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (547663 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

在ISR缩减为leader时,模拟leader宕机时的数据接入,leader失效后不能再写入数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
%4|1638583911.382|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected (after 75954ms in state UP)
%3|1638583911.382|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: Connection refused (after 0ms in state CONNECT)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%3|1638583978.567|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: No route to host (after 66977ms in state CONNECT)
Traceback (most recent call last):
  File "producer.py", line 65, in <module>
    p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 207350
Delivered: 107307
Failed: 43
%4|1638583985.412|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (600000 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

在所有网络恢复正常后,ISR恢复为3个节点后,模拟leader宕机,发生正常leader切换,期间只发生少量数据发送失败,没有发生数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%4|1638584478.892|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected (after 21710ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%6|1638584478.893|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
Success: 49935 Failed: 65
Success: 59935 Failed: 65
Success: 69935 Failed: 65
Success: 79935 Failed: 65
Success: 89935 Failed: 65
Success: 99935 Failed: 65
Success: 109935 Failed: 65
Success: 119935 Failed: 65
Success: 129935 Failed: 65
Success: 139935 Failed: 65
Success: 149935 Failed: 65
Success: 159935 Failed: 65
Success: 169935 Failed: 65
%3|1638584547.911|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: No route to host (after 68719ms in state CONNECT)
Success: 179935 Failed: 65
Success: 189935 Failed: 65
Success: 199935 Failed: 65
Success: 209935 Failed: 65
Success: 219935 Failed: 65
Success: 229935 Failed: 65
Success: 239935 Failed: 65
Success: 249935 Failed: 65
Success: 259935 Failed: 65
Success: 269935 Failed: 65
Success: 279935 Failed: 65
Success: 289935 Failed: 65
Success: 299935 Failed: 65
Success: 309935 Failed: 65
Success: 319935 Failed: 65
Success: 329935 Failed: 65
Success: 339935 Failed: 65
Success: 349935 Failed: 65
Success: 359935 Failed: 65
Success: 369935 Failed: 65
Success: 379935 Failed: 65
Success: 389935 Failed: 65
Success: 399935 Failed: 65
Success: 409935 Failed: 65
Success: 419935 Failed: 65
Success: 429935 Failed: 65
Success: 439935 Failed: 65
Success: 449935 Failed: 65
Success: 459935 Failed: 65
Success: 469935 Failed: 65
Success: 479935 Failed: 65
Success: 489935 Failed: 65
Success: 499935 Failed: 65
Success: 509935 Failed: 65
Success: 519935 Failed: 65
Success: 529935 Failed: 65
Success: 539935 Failed: 65
Success: 549935 Failed: 65
Success: 559935 Failed: 65
Success: 569935 Failed: 65
Success: 579935 Failed: 65
Success: 589935 Failed: 65
Success: 599935 Failed: 65
Success: 609935 Failed: 65
Success: 619935 Failed: 65
Success: 629935 Failed: 65
Success: 639935 Failed: 65
Success: 649935 Failed: 65
Success: 659935 Failed: 65
Success: 669935 Failed: 65
Success: 679935 Failed: 65
Success: 689935 Failed: 65
Success: 699935 Failed: 65
Success: 709935 Failed: 65
Success: 719935 Failed: 65
Success: 729935 Failed: 65
Success: 739935 Failed: 65
Success: 749935 Failed: 65
Success: 759935 Failed: 65
Success: 769935 Failed: 65
Success: 779935 Failed: 65
Success: 789935 Failed: 65
Success: 799935 Failed: 65
Success: 809935 Failed: 65
Success: 819935 Failed: 65
Success: 829935 Failed: 65
Success: 839935 Failed: 65
Success: 849935 Failed: 65
Success: 859935 Failed: 65
Success: 869935 Failed: 65
Success: 879935 Failed: 65
Success: 889935 Failed: 65
Success: 899935 Failed: 65
Success: 909935 Failed: 65
Success: 919935 Failed: 65
Success: 929935 Failed: 65
Success: 939935 Failed: 65
Success: 949935 Failed: 65
Success: 959935 Failed: 65
Success: 969935 Failed: 65
Success: 979935 Failed: 65
Success: 989935 Failed: 65
Success: 999935 Failed: 65
Sent: 1000000
Delivered: 999935
Failed: 65

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    当acks=all,unclean.leader.election.enable=false,min.insync.replicas=1,所有follower由于慢导致被踢出ISR,ISR缩减为leader,acks退化为1。acks在退化为1之前客户端写入失败,之后可以正常写入数据。

    这时如果leader失效,由于设置unclean.leader.election.enable=false,故不会发生leader切换,数据将不能进行读写操作。

    这时只能等待原leader能够恢复。如果原leader不能恢复将导致所有数据丢失。

    原leader恢复以后可以正常读写数据,当网络慢的问题解决以后,leader再失效,可以发生正常切换,在原leader恢复后,也可以重新加入集群,不会发生数据丢失。

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景九

场景设定:

在场景八的基础上设置min.insync.replicas=2,确保在写入数据时至少有两个broker节点的数据是一致的

操作过程:

客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换,之后恢复

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic-min-insync.sh kafka1 test1 2
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      NORMAL                
kafka2          7893d611b79b    UP      172.17.0.4      NORMAL                
kafka3          4dfd87691f46    UP      172.17.0.5      NORMAL                
zk1             06b129f53213    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        
当前leader为3,模拟节点1、2出现网络慢的故障,导致ISR缩减为leader
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka1 kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      SLOW                  
kafka2          7893d611b79b    UP      172.17.0.4      SLOW                  
kafka3          4dfd87691f46    UP      172.17.0.5      NORMAL                
zk1             06b129f53213    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka3 test1
...
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3
^C

leader没有失效的时候,ISR缩减为只有leader,写入操作由于不满足min.insync.replicas=2而失败,出现错误code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"。之后模拟leader节点出现宕机,
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      SLOW                  
kafka2          7893d611b79b    UP      172.17.0.4      SLOW                  
kafka3          4dfd87691f46    DOWN                    UNKNOWN               
zk1             06b129f53213    UP      172.17.0.2      NORMAL     

leader失效后,新的leader无法选举出来,数据无法写入,也无法读取,ISR维持原样,Producer端hung住一段时间后超时失败;恢复原leader
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      SLOW                  
kafka2          7893d611b79b    UP      172.17.0.4      SLOW                  
kafka3          4dfd87691f46    UP      172.17.0.5      NORMAL                
zk1             06b129f53213    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka3 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3
        
原leader恢复后,已写入数据恢复,数据没有丢失;如果leader不能恢复,所有数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:59063

所有follower网络恢复正常后,从leader同步数据,ISR恢复正常;由于ISR里面的节点数据超过2,因此可以正常写入数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka1 kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      NORMAL                
kafka2          7893d611b79b    UP      172.17.0.4      NORMAL                
kafka3          4dfd87691f46    UP      172.17.0.5      NORMAL                
zk1             06b129f53213    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka3 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1
        
这时leader再失效,进行leader切换,出现少量写入错误,leader切换完成后,写入正常,ISR去除失效节点,数据没有丢失;
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      NORMAL                
kafka2          7893d611b79b    UP      172.17.0.4      NORMAL                
kafka3          4dfd87691f46    DOWN                    UNKNOWN               
zk1             06b129f53213    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 2       Replicas: 3,2,1 Isr: 2,1
        
切换后的leader再失效,ISR缩减为只有leader,写入操作数据由于不满足min.insync.replicas=2导致写入失败,出现错误code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"。这时只能读取数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3a0f0cb52624    UP      172.17.0.3      NORMAL                
kafka2          7893d611b79b    DOWN                    UNKNOWN               
kafka3          4dfd87691f46    DOWN                    UNKNOWN               
zk1             06b129f53213    UP      172.17.0.2      NORMAL             
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,2,1 Isr: 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:209578

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60988ms, timeout #0)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60981ms, timeout #1)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60975ms, timeout #2)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60969ms, timeout #3)
%5|1638601273.277|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Timed out ProduceRequest in flight (after 60963ms, timeout #4)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... 
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
^CTraceback (most recent call last):
  File "producer.py", line 68, in <module>
    sleep(wait_period)
KeyboardInterrupt
Sent: 9566
Delivered: 0
Failed: 9556
%4|1638601352.387|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 10 messages (40 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%3|1638601448.775|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/bootstrap: Connect to ipv4#172.17.0.5:9092 failed: No route to host (after 18562ms in state CONNECT)
Traceback (most recent call last):
  File "producer.py", line 65, in <module>
    p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 100000
Delivered: 0
Failed: 0
%4|1638601500.477|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (488895 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%4|1638601740.244|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected (after 22236ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
...
%6|1638601740.246|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY)
Success: 49949 Failed: 51
Success: 59949 Failed: 51
Success: 69949 Failed: 51
Success: 79949 Failed: 51
Success: 89949 Failed: 51
Success: 99949 Failed: 51
Success: 109949 Failed: 51
Success: 119949 Failed: 51
Success: 129949 Failed: 51
Success: 139949 Failed: 51
Success: 149949 Failed: 51
%4|1638601791.642|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Disconnected (after 31598ms in state UP)
%3|1638601791.643|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Connect to ipv4#172.17.0.4:9092 failed: Connection refused (after 0ms in state CONNECT)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
...
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
KafkaError{code=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"}
^CTraceback (most recent call last):
  File "producer.py", line 68, in <module>
    sleep(wait_period)
KeyboardInterrupt
Sent: 282974
Delivered: 150515
Failed: 132456
%4|1638601861.504|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 3 messages (18 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    这个场景与场景八类似,区别在于ISR里面broker节点数在低于2时不能正常写入数据,只能读取数据

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


测试场景十

场景设定:

在场景九的基础上,如果原leader磁盘出现故障不能恢复,在丢失全部数据的情况下重新加入集群

操作过程:

客户端模拟向kafka多次发送1000000条数据,acks设置为all,期间模拟由于follower缓慢而导致ISR缩减为leader自身,然后再模拟topic的leader故障失效强制leader发生切换。在原leader恢复的过程中模拟出现了不可恢复的磁盘故障,导致原leader上的数据全部丢失

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic-min-insync.sh kafka1 test1 2
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3d3dacd9b095    UP      172.17.0.3      NORMAL                
kafka2          931bd63b61e2    UP      172.17.0.4      NORMAL                
kafka3          955172d1a496    UP      172.17.0.5      NORMAL                
zk1             94404576cf08    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        
当前leader为1,kafka集群这时已经写入了大量数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:109498

模拟follower节点出现磁盘缓慢的故障
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade slow kafka2 kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3d3dacd9b095    UP      172.17.0.3      NORMAL                
kafka2          931bd63b61e2    UP      172.17.0.4      SLOW                  
kafka3          955172d1a496    UP      172.17.0.5      SLOW                  
zk1             94404576cf08    UP      172.17.0.2      NORMAL      

最终ISR缩减为只包含leader节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ show-leader.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
...
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1
^C

这时在leader节点上是已经保存了大量数据的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:159391

模拟leader节点出现宕机故障,虽然follower节点上也保存了部分数据,但是不可用
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3d3dacd9b095    DOWN                    UNKNOWN               
kafka2          931bd63b61e2    UP      172.17.0.4      SLOW                  
kafka3          955172d1a496    UP      172.17.0.5      SLOW                  
zk1             94404576cf08    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
[2021-12-04 08:53:54,124] WARN [AdminClient clientId=adminclient-1] Connection to node 1 (/172.17.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: none    Replicas: 1,2,3 Isr: 1
        
模拟原leader节点上出现了不可恢复的磁盘故障,所有数据丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ sudo rm -rf volumes/kafka/01/data/
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ mkdir -p volumes/kafka/01/data/

启动原leader节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade start kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3d3dacd9b095    UP      172.17.0.3      NORMAL                
kafka2          931bd63b61e2    UP      172.17.0.4      SLOW                  
kafka3          955172d1a496    UP      172.17.0.5      SLOW                  
zk1             94404576cf08    UP      172.17.0.2      NORMAL

原leader节点能够正常加入集群,但是集群原先保存的数据全部丢失
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:0
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade fast kafka2 kafka3    
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:0
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          3d3dacd9b095    DOWN                    UNKNOWN               
kafka2          931bd63b61e2    UP      172.17.0.4      NORMAL                
kafka3          955172d1a496    UP      172.17.0.5      NORMAL                
zk1             94404576cf08    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: min.insync.replicas=2
        Topic: test1    Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 2,3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:0

原leader在丢失数据后,重启出现的日志:
由于不能找到以前的任何信息No checkpointed highwatermark is found for partition test1-0,因此Log loaded for partition test1-0 with initial high watermark 0然后将test1-0 starts at leader epoch 2 from offset 0 with high watermark 0

[2021-12-05 02:14:05,737] ERROR [Broker id=1] Received LeaderAndIsrRequest with correlation id 1 from controller 3 epoch 2 for partition test1-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:14:05,799] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-05 02:14:05,805] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 34 ms (kafka.log.Log)
[2021-12-05 02:14:05,810] INFO Created log for partition test1-0 in /var/lib/kafka/data/test1-0 with properties {min.insync.replicas=2} (kafka.log.LogManager)
[2021-12-05 02:14:05,811] INFO [Partition test1-0 broker=1] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,811] INFO [Partition test1-0 broker=1] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,811] ERROR [Broker id=1] Received LeaderAndIsrRequest with correlation id 1 from controller 3 epoch 2 for partition __confluent.support.metrics-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:14:05,815] INFO [Log partition=__confluent.support.metrics-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-05 02:14:05,815] INFO [Log partition=__confluent.support.metrics-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
[2021-12-05 02:14:05,816] INFO Created log for partition __confluent.support.metrics-0 in /var/lib/kafka/data/__confluent.support.metrics-0 with properties {retention.ms=31536000000} (kafka.log.LogManager)
[2021-12-05 02:14:05,816] INFO [Partition __confluent.support.metrics-0 broker=1] No checkpointed highwatermark is found for partition __confluent.support.metrics-0 (kafka.cluster.Partition)
[2021-12-05 02:14:05,816] INFO [Partition __confluent.support.metrics-0 broker=1] Log loaded for partition __confluent.support.metrics-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-05 02:14:10,185] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0, __confluent.support.metrics-0) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:10,191] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 2 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-05 02:14:10,200] INFO [Partition __confluent.support.metrics-0 broker=1] __confluent.support.metrics-0 starts at leader epoch 2 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-05 02:14:11,394] INFO [Partition test1-0 broker=1] Expanding ISR from 1 to 1,2 (kafka.cluster.Partition)
[2021-12-05 02:14:11,401] INFO [Partition test1-0 broker=1] ISR updated to [1,2] and zkVersion updated to [4] (kafka.cluster.Partition)
[2021-12-05 02:14:14,459] INFO [Partition test1-0 broker=1] Expanding ISR from 1,2 to 1,2,3 (kafka.cluster.Partition)
[2021-12-05 02:14:14,462] INFO [Partition test1-0 broker=1] ISR updated to [1,2,3] and zkVersion updated to [5] (kafka.cluster.Partition)

follower上出现的日志:
在出现UNKNOWN_LEADER_EPOCH错误后,将本地数据Truncating to offset 0

[2021-12-05 02:13:42,140] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test1-0=(fetchOffset=57962, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=115803198, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:45,821] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Connection to node 1 (kafka1/172.17.0.3:19092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-12-05 02:13:45,822] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=115803198, epoch=INITIAL) to node 1: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to kafka1:19092 (id: 1 rack: null) failed.
        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:45,822] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={test1-0=(fetchOffset=57962, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[0])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=115803198, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to kafka1:19092 (id: 1 rack: null) failed.
        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 02:13:50,568] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:53,741] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:57,128] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:13:59,409] ERROR [Broker id=2] Received LeaderAndIsrRequest with correlation id 3 from controller 3 epoch 2 for partition test1-0 (last update controller epoch 2) but cannot become follower since the new leader -1 is unavailable. (state.change.logger)
[2021-12-05 02:13:59,689] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:14:03,821] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition test1-0 at offset 57962 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2021-12-05 02:14:06,095] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:06,096] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=57962, leaderEpoch=2)) (kafka.server.ReplicaFetcherManager)
[2021-12-05 02:14:07,130] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test1-0 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
[2021-12-05 02:14:10,372] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test1-0 as the leader reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
[2021-12-05 02:14:11,381] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to offset 0 (kafka.log.Log)
[2021-12-05 02:14:11,383] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Scheduling segments for deletion List() (kafka.log.Log)
[2021-12-05 02:14:11,390] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Success: 110000 Failed: 0
Success: 120000 Failed: 0
Success: 130000 Failed: 0
Traceback (most recent call last):
  File "producer.py", line 65, in <module>
    p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 237429
Delivered: 137429
Failed: 0
%4|1638607813.104|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (600000 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 1000000 0.0001 all test1
%5|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60011ms, timeout #0)
%5|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60005ms, timeout #1)
%4|1638607895.990|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 2 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
%3|1638607895.990|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 2 request(s) timed out: disconnect (after 60072ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Traceback (most recent call last):
  File "producer.py", line 65, in <module>
    p.produce(topic, str(data).encode('utf-8'), callback=delivery_report)
BufferError: Local: Queue full
Sent: 110645
Delivered: 1926
Failed: 8719
%4|1638607913.414|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 100000 messages (510646 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade slow、blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    当ISR由于某种原因缩减为leader自身后(节点缓慢、网络缓慢、节点宕机等),如果这时leader再出现故障宕机,同时leader上的磁盘出现不可恢复的故障,丢失原有数据后再重新加入集群,这样将导致整个topic的数据全部丢失,即使其他的follower上保留有部分的数据

    在早期的版本中,如果遇到这种情况,会导致整个集群崩溃。follower中会出现类似的日志:

    log from kafka2

    [2021-10-13 06:09:03,601] ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 100955 (kafka.server.ReplicaFetcherThread)

    log from kafka3

    [2021-10-13 06:09:02,856] ERROR [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 100955 (kafka.server.ReplicaFetcherThread)

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异


数据丢失的场景回顾

通过前面的测试,我们在这里总结一下会导致kafka丢失数据的情况:



  • 当acks=1时发生leader的故障切换
  • 即使在设置acks=all的情况下,如果允许unclean的故障切换(被选举为新leader的相关follower已经不在ISR列表中)
  • 当acks=1时leader与zookeeper发生网络分区,形成短暂脑裂
  • 在ISR已经缩减到leader自身的情况下,leader与其他kafka节点及zookeeper都发生隔离,即使将acks设置为all,原leader也会接收写入请求,导致数据丢失。特别是当min.insync.replicas=1的时候。尤其是这时如果leader发生了不可修复的磁盘故障导致,leader上的数据丢失,在空数据恢复leader后会最终导致整个topic的数据丢失
  • 数据分区的所有节点同时出现故障。由于数据仅仅在内存中写入即被确认,这些数据很有可能没有被写入到磁盘中。当数据节点恢复的时候将导致数据的丢失。



要避免上述情况,可以通过一些设置来一定程度上解决。Unclean的故障切换可以通过设置unclean.leader.election.enable=false来避免,或者是通过设置min.insync.replicas大于等于2确保数据冗余度至少是2来避免。在需要保证高可靠的环境中通常设置acks=all,同时min.insync.replicas大于1。但这样做的代价是:

增加数据处理延迟,性能降低
在ISR退化到只有leader的时候不能再写入数据,可用性降低
由于不能切换到unclean的节点,因此如果leader出现故障无法修复,即使有其他节点拥有部分数据,仍然不可用,可用性降低
原题:Kafka 容错及高可用测试一;Kafka 容错及高可用测试一 / via 民生运维人
觉得本文有用,请转发或点击“在看”,让更多同行看到


欢迎关注社区 “Kafka”技术主题 ,将会不断更新优质资料、文章。地址:

https://www.talkwithtrend.com/Topic/111827/doc


下载 twt 社区客户端 APP


长按识别二维码即可下载

或到应用商店搜索“twt”


长按二维码关注公众号

*本公众号所发布内容仅代表作者观点,不代表社区立场

文章转载自twt企业IT社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论