最近的工作内容涉及到了将业务数据导入到分布式数据库中,在排查问题的时候涉及到了Kafka。之前基本没有用到,趁着这次机会把这Kafka在日常运维中用到工具做了个学习。也因此有了这个笔记。
声明:本文中命令的例子是基于2.4.1 版本测试的
查看版本命令
[root@VM-24-17-centos kafka]#./bin/kafka-console-consumer.sh --version
2.4.1 (Commit:c57222ae8cd7866b)
Topic管理命令
bin/kafka-topics.sh 命令
参数介绍
--bootstrap-server : 指定连接到kafka的参数, host:port形式,和指定zk参数不可同时使用 --zookeeper:基本弃用了,通过连zk方式接到kafka集群, host:port --replication-factor 副本数量,不可打过broker数量,不指定使用集群默认配置 --replication-factor 3 --partitions 分区数量,默认为集群配置参数,修改时注意如果减少可能会有问题, --partitions 3 --replica-assignment 设置副本分区分配方式,创建topic时使用 BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ;
例子意思是有三个分区和三个副本,对应分配的broker, 逗号隔开标识分区;冒号隔开表示副本--config 设置某topic级别配置覆盖默认配置;只在--create 和--bootstrap 同时使用时生效。 --config retention.bytes=123455 --config retention.ms=600001--command-config 配置客户端admin Client启动配置,只在和--bootstrap-server配合时生效。例如:设置请求的超时时间 --command-config config/producer.proterties ; 然后在文件中配置 request.timeout.ms=300000
使用例子
列出所有topic
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --list
输出某个topic 的相信信息
--exclude-internal 排序kafka内部topic 如 __consumer_offsets-*
指定topic输出
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --describe
正则匹配,批量列出
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --describe --topic ".*?"
创建topic
注意名字不要使用 .
和_
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --create --topic test1 --partitions 1 --replication-factor 1
删除topic
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --delete --topic test_1
对topic进行扩容
当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;
--alter 是对topic进行 修改操作; Alter the number of partitions,replica assignment, and/or configuration for the topic.
对单个进行扩容
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --alter --topic test_maxbtyes --partitions 2
批量扩容
./bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
(.*? 是匹配所有)
修改配置命令
bin/kafka-configs.sh 命令
强烈建议指定group 名字创建 消费者,这样在进行运维修改时才可方便执行命令
当前版本无法支持使用 --bootstrap-server,或者自己用的方法不对,测试总是失败
展示关于Topic的动静态配置
展示所有topic
./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181 --entity-type topics --describe
展示某个topic
./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181 --entity-type topics --entity-name maxwell --describe
增删改配置
--alter --add-config --delete-config 配合 --entity-type 设置修改类型 --entity-name修改对应类型的名字
删除配置
./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181 --entity-type topics --entity-name maxwell --alter --delete-config 'max.message.bytes'
增加配置
./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181 --entity-type topics --entity-name maxwell --alter --add-config 'max.message.bytes=20000000'
优先级 指定动态配置>默认动态配置>静态配置
向Topic 发送消息
命令 bin/kafka-console-producer.sh
./bin/kafka-console-producer.sh --broker-list 10.0.24.17:9090 --topic test_maxbtyes
参数 值类型 说明 有效值
--bootstrap-server 要连接的服务器必需(除非指定--broker-list) - 如:host1:prot1,host2:prot2 --topic String (必需)接收消息的主题名称 --batch-size 单个批处理中发送的消息数 200(默认值) --compression-codec 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd --max-block-ms 在发送请求期间,生产者将阻止的最长时间 60000(默认值) --max-memory-bytes 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值) --max-partition-memory-bytes 为分区分配的缓冲区大小 16384 --message-send-max-retries 最大的重试发送次数 3 --metadata-expiry-ms 强制更新元数据的时间阈值(ms) 300000 --producer-property 将自定义属性传递给生成器的机制 如:key=value --producer.config 生产者配置属性文件[--producer-property]优先于此配置 - 配置文件完整路径 --property 自定义消息读取器 parse.key=true/false - key.separator=<key.separator>ignore.error=true/false --request-required-acks 生产者请求的确认方式 0、1(默认值)、all --request-timeout-ms 生产者请求的确认超时时间 1500(默认值) --retry-backoff-ms 生产者重试前,刷新元数据的等待时间阈值 100(默认值) --socket-buffer-size TCP接收缓冲大小 102400(默认值) --timeout 消息排队异步等待处理的时间阈值 1000(默认值) --sync 同步发送消息 --version 显示 版本 不配合其他参数时,显示为本地Kafka版本 --help 打印帮助信息
Topic 的消费
命令 bin/kafka-console-consumer.sh
强烈建议指定group 名字创建 消费者,这样在进行运维修改时才可方便执行命令
从开始进行消费
指定使用某 group进行消费 topic
/bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --group console-consumer-75118 --from-beginning
不指定使用某 group进行消费 topic 也起到使用新名称的效果
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --from-beginning
获取所有消费组
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --list
使用正则表达式消费多个topic
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.17:9092 --whitelist 'maxwell.*'
指定分区消费 , 指定起始偏移量消费 分区 --partition 指定起始偏移量消费 --offset
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --partition 0 --offset 30000
添加客户端属性
--consumer-property :使用命令行方式增加属性,注意不用和其他参数有重复属性的添加 --consumer.config:指定配置文件来设置客户端属性 优先级:--consumer-property 大于 --consumer.config
指定使用某 group进行消费 topic
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
消费者管理
kafka_consumer_groups.sh
列出消费者列表
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --list
查看消费者详情 --describe
查看指的group 的信息
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-49595 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-49595 maxwell 0 39683 39686 3 - - -
显示该消费组消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID信息等信息
查看所有group 信息
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --all-groups --describe
查看消费者状态信息 --state
查看指定group 状态
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-75118 --state --describe
查看所有group 状态
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --all-groups --state --describe
删除消费者组想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报错
删除指定group
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --delete --group console-consumer-49595
删除所有group
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --delete --all-groups
重置消费组偏移量 ==注意点:==
被修改消费组为不可用状态使用 --describe --state查看 : Stable,CompletingRebalanc
都不行先试用 --dry-run测试是否可以执行 再使用 --execute 执行 在设置偏移量后,开启消费程序必须指定消费程序,使用该group且不能再指定点位(--from-beginning) --from-file以外的配置都是对该消费组的所有partition 统一设置值,无法做到指定某一个partition的偏移量
参数
--to-earliest : 重置offset到最开始的那条offset(找到还未被删除最早的那个offset) --to-current: 直接重置offset到当前的offset,也就是LOE --to-latest: 重置到最后一个offset --to-datetime: 重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss; --to-datetime "2021-6-26T00:00:00.000" --to-offset 重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个 --to-off
以--to-offset 为例
#1. 查看状态
[root@VM-24-17-centos kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --all-groups --describe --state
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
console-consumer-70180 VM-24-17-centos:9092 (0) Empty 0
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
console-consumer-75118 VM-24-17-centos:9092 (0) CompletingRebalance 1
#2. console-consumer-70180 可进行修改
测试命令是否可执行
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-70180 --reset-offsets --to-offset 35000 --dry-run --topic maxwell
真正进行修改
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-70180 --reset-offsets --to-offset 35000 --execute --topic maxwell
#3 查看修改结果
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-70180 --describe
Consumer group 'console-consumer-70180' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-70180 maxwell 0 35000 39686 4686 - - -
#4 启动消费程序
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --group console-consumer-70180
使用--from-file 灵活指定消费组偏移量--from-file 需要配合 csv 文件指定每个partition 的点位 规则是:一行为一个分区,使用逗号分割如: topic,partition,offset
[root@VM-24-17-centos kafka]# cat fdsafdas123.csv
test_maxbtyes,0,3 >>>>>> #(topic:test_maxbtyes partition:0 offset:3)
test_maxbtyes,1,40
具体例子
#1 查看消费组状态
[root@VM-24-17-centos kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --describe --group fdsafdas123 --state
#2 创建csv文件
[root@VM-24-17-centos kafka]# cat fdsafdas123.csv
test_maxbtyes,0,3
test_maxbtyes,1,40
#3 修改偏移量
$--dry-run 测试
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --reset-offsets --dry-run --group fdsafdas123 --from-file fdsafdas123.csv
$ --execute 执行修改
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --reset-offsets --execute --group fdsafdas123 --from-file fdsafdas123.csv
#4 查看状态
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --describe --group fdsafdas123 --state
其他
kafka-verifiable-producer.sh用于测试验证生产者的功能。 kafka-verifiable-consumer.sh用于测试验证消费者功能 kafka_producer_perf_test.sh 生产者压力测试 kafka_consumer_perf_test.sh 消费者压力测试 kafka_delete_records.sh 删除指定分区的消息,和指定配置文件xxxx.json配合 bin/kafka-server-stop.sh 关闭程序名 bin/kafka-server-start.sh 启动kafka程序




