暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka命令笔记

天天李拜天DBA 2023-09-15
304

最近的工作内容涉及到了将业务数据导入到分布式数据库中,在排查问题的时候涉及到了Kafka。之前基本没有用到,趁着这次机会把这Kafka在日常运维中用到工具做了个学习。也因此有了这个笔记。

声明:本文中命令的例子是基于2.4.1 版本测试的

查看版本命令

    [root@VM-24-17-centos kafka]#./bin/kafka-console-consumer.sh --version
    2.4.1 (Commit:c57222ae8cd7866b)

Topic管理命令

bin/kafka-topics.sh 命令

参数介绍

  • --bootstrap-server : 指定连接到kafka的参数, host:port形式,和指定zk参数不可同时使用
  • --zookeeper:基本弃用了,通过连zk方式接到kafka集群, host:port
  • --replication-factor 副本数量,不可打过broker数量,不指定使用集群默认配置 --replication-factor 3
  • --partitions  分区数量,默认为集群配置参数,修改时注意如果减少可能会有问题, --partitions 3
  • --replica-assignment 设置副本分区分配方式,创建topic时使用BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ;
    例子意思是有三个分区和三个副本,对应分配的broker, 逗号隔开标识分区;冒号隔开表示副本
  • --config 设置某topic级别配置覆盖默认配置;只在--create 和--bootstrap 同时使用时生效。--config retention.bytes=123455 --config retention.ms=600001
  • --command-config 配置客户端admin Client启动配置,只在和--bootstrap-server配合时生效。例如:设置请求的超时时间 --command-config config/producer.proterties ; 然后在文件中配置 request.timeout.ms=300000

使用例子

列出所有topic
    ./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --list

输出某个topic 的相信信息
  • --exclude-internal 排序kafka内部topic 如 __consumer_offsets-*
指定topic输出
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --describe

正则匹配,批量列出
./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --describe --topic ".*?"

创建topic

注意名字不要使用 .
_

    ./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --create --topic test1 --partitions 1 --replication-factor 1

删除topic
     ./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --delete --topic test_1

对topic进行扩容

当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;

  • --alter  是对topic进行 修改操作;Alter the number of partitions,replica assignment, and/or configuration for the topic.
对单个进行扩容
 ./bin/kafka-topics.sh --bootstrap-server 10.0.24.17:9092 --alter --topic test_maxbtyes --partitions 2
批量扩容
 ./bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
   (.*? 是匹配所有)

修改配置命令

bin/kafka-configs.sh 命令

强烈建议指定group 名字创建 消费者,这样在进行运维修改时才可方便执行命令

当前版本无法支持使用 --bootstrap-server,或者自己用的方法不对,测试总是失败

展示关于Topic的动静态配置
展示所有topic

    ./bin/kafka-configs.sh  --zookeeper 10.0.24.17:2181 --entity-type topics  --describe

展示某个topic

    ./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181    --entity-type topics --entity-name maxwell --describe

增删改配置
  • --alter
  • --add-config
  • --delete-config 配合 --entity-type 设置修改类型 --entity-name修改对应类型的名字
删除配置
 ./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181  --entity-type topics --entity-name maxwell --alter  --delete-config 'max.message.bytes'

增加配置
./bin/kafka-configs.sh --zookeeper 10.0.24.17:2181  --entity-type topics --entity-name maxwell --alter  --add-config  'max.message.bytes=20000000'

优先级 指定动态配置>默认动态配置>静态配置

向Topic 发送消息

命令 bin/kafka-console-producer.sh

./bin/kafka-console-producer.sh  --broker-list 10.0.24.17:9090 --topic test_maxbtyes

参数 值类型 说明 有效值

  • --bootstrap-server 要连接的服务器必需(除非指定--broker-list) - 如:host1:prot1,host2:prot2
  • --topic String (必需)接收消息的主题名称
  • --batch-size 单个批处理中发送的消息数 200(默认值)
  • --compression-codec 压缩编解码器 none、gzip(默认值)snappy、lz4、zstd
  • --max-block-ms 在发送请求期间,生产者将阻止的最长时间 60000(默认值)
  • --max-memory-bytes 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值)
  • --max-partition-memory-bytes 为分区分配的缓冲区大小 16384
  • --message-send-max-retries 最大的重试发送次数 3
  • --metadata-expiry-ms 强制更新元数据的时间阈值(ms) 300000
  • --producer-property 将自定义属性传递给生成器的机制 如:key=value
  • --producer.config 生产者配置属性文件[--producer-property]优先于此配置 - 配置文件完整路径
  • --property 自定义消息读取器 parse.key=true/false - key.separator=<key.separator>ignore.error=true/false
  • --request-required-acks 生产者请求的确认方式 0、1(默认值)、all
  • --request-timeout-ms 生产者请求的确认超时时间 1500(默认值)
  • --retry-backoff-ms 生产者重试前,刷新元数据的等待时间阈值 100(默认值)
  • --socket-buffer-size TCP接收缓冲大小 102400(默认值)
  • --timeout 消息排队异步等待处理的时间阈值 1000(默认值)
  • --sync 同步发送消息
  • --version 显示  版本 不配合其他参数时,显示为本地Kafka版本
  • --help 打印帮助信息

Topic 的消费

命令 bin/kafka-console-consumer.sh

强烈建议指定group 名字创建 消费者,这样在进行运维修改时才可方便执行命令

  • 从开始进行消费
  • 指定使用某 group进行消费 topic
/bin/kafka-console-consumer.sh  --bootstrap-server 10.0.24.17:9092 --topic maxwell --group console-consumer-75118  --from-beginning

  • 不指定使用某 group进行消费 topic 也起到使用新名称的效果
 ./bin/kafka-console-consumer.sh  --bootstrap-server 10.0.24.17:9092 --topic maxwell   --from-beginning

  • 获取所有消费组
./bin/kafka-consumer-groups.sh --bootstrap-server  10.0.24.17:9092 --list

  • 使用正则表达式消费多个topic
./bin/kafka-console-consumer.sh  --bootstrap-server 10.0.24.17:9092 --whitelist 'maxwell.*'

  • 指定分区消费 , 指定起始偏移量消费 分区 --partition 指定起始偏移量消费 --offset
 ./bin/kafka-console-consumer.sh  --bootstrap-server 10.0.24.17:9092 --topic maxwell  --partition 0 --offset 30000

  • 添加客户端属性

--consumer-property :使用命令行方式增加属性,注意不用和其他参数有重复属性的添加 --consumer.config:指定配置文件来设置客户端属性 优先级:--consumer-property  大于 --consumer.config

  • 指定使用某 group进行消费 topic
  sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group
  sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties

消费者管理

kafka_consumer_groups.sh

  • 列出消费者列表
 ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --list

  • 查看消费者详情 --describe

查看指的group 的信息
 ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-49595  --describe 

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
console-consumer-49595 maxwell         0          39683           39686           3               -               -               -
显示该消费组消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID信息等信息

查看所有group 信息
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --all-groups  --describe

  • 查看消费者状态信息 --state

    查看指定group 状态
    ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-75118  --state --describe

    查看所有group 状态
    ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --all-groups   --state --describe

  • 删除消费者组想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报错
删除指定group
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --delete --group console-consumer-49595 

删除所有group
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --delete --all-groups


  • 重置消费组偏移量 ==注意点:==
  1. 被修改消费组为不可用状态使用 --describe  --state查看 :Stable,CompletingRebalanc
    都不行
  2. 先试用 --dry-run测试是否可以执行 再使用 --execute 执行
  3. 在设置偏移量后,开启消费程序必须指定消费程序,使用该group且不能再指定点位(--from-beginning)
  4. --from-file以外的配置都是对该消费组的所有partition 统一设置值,无法做到指定某一个partition的偏移量
参数

--to-earliest : 重置offset到最开始的那条offset(找到还未被删除最早的那个offset) --to-current: 直接重置offset到当前的offset,也就是LOE --to-latest: 重置到最后一个offset --to-datetime: 重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss; --to-datetime "2021-6-26T00:00:00.000" --to-offset 重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个 --to-off

以--to-offset 为例

#1. 查看状态
[root@VM-24-17-centos kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --all-groups   --describe  --state
GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
console-consumer-70180    VM-24-17-centos:9092 (0)                       Empty           0

GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
console-consumer-75118    VM-24-17-centos:9092 (0)                       CompletingRebalance 1

#2. console-consumer-70180 可进行修改
测试命令是否可执行
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-70180   --reset-offsets --to-offset 35000 --dry-run --topic maxwell 
真正进行修改
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-70180   --reset-offsets --to-offset 35000 --execute --topic maxwell


#3 查看修改结果
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --group console-consumer-70180   --describe  

Consumer group 'console-consumer-70180' has no active members.

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
console-consumer-70180 maxwell         0          35000           39686           4686            -               -               -


#4 启动消费程序
 ./bin/kafka-console-consumer.sh --bootstrap-server 10.0.24.17:9092 --topic maxwell --group console-consumer-70180



  • 使用--from-file 灵活指定消费组偏移量--from-file 需要配合 csv 文件指定每个partition 的点位 规则是:一行为一个分区,使用逗号分割如: topic,partition,offset
 [root@VM-24-17-centos kafka]#  cat fdsafdas123.csv 
    test_maxbtyes,0,3 >>>>>>  #(topic:test_maxbtyes   partition:0   offset:3)
    test_maxbtyes,1,40

具体例子

#1 查看消费组状态
[root@VM-24-17-centos kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --describe --group fdsafdas123 --state

#2 创建csv文件
[root@VM-24-17-centos kafka]#  cat fdsafdas123.csv 
test_maxbtyes,0,3   
test_maxbtyes,1,40


#3 修改偏移量
 $--dry-run 测试
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --reset-offsets --dry-run --group fdsafdas123 --from-file fdsafdas123.csv 
 $ --execute 执行修改
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --reset-offsets --execute  --group fdsafdas123 --from-file fdsafdas123.csv 

#4 查看状态
./bin/kafka-consumer-groups.sh --bootstrap-server 10.0.24.17:9092 --describe --group fdsafdas123 --state


其他

  • kafka-verifiable-producer.sh用于测试验证生产者的功能。
  • kafka-verifiable-consumer.sh用于测试验证消费者功能
  • kafka_producer_perf_test.sh  生产者压力测试
  • kafka_consumer_perf_test.sh 消费者压力测试
  • kafka_delete_records.sh 删除指定分区的消息,和指定配置文件xxxx.json配合
  • bin/kafka-server-stop.sh 关闭程序名
  • bin/kafka-server-start.sh 启动kafka程序



文章转载自天天李拜天DBA,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论