暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka常用命令详解

原创 超越无限D 2022-09-17
1541

Kafka

注: 这里机器我只写一个。命令你们也可使用 ./bin/xx.sh (如:./bin/kafka-topics.sh)

查看当前服务器中的所有topic

kafka-topics --zookeeper xxxxxx:2181 --list --exclude-internal 说明: exclude-internal:排除kafka内部topic 比如: --exclude-internal --topic "test_.*"

创建topic

kafka-topics --zookeeper xxxxxx:2181 --create --replication-factor --partitions 1 --topic topic_name 说明: --topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数

删除topic

注意: 需要server.properties中设置delete.topic.enable=true否则只是标记删除

kafka-topics --zookeeper xxxxxx:2181 --delete --topic topic_name

生产者

kafka-console-producer --broker-list xxxxxx:9092 --topic topic_name 可加:--property parse.key=true(有key消息)

消费者

kafka-console-consumer --bootstrap-server xxxxxx:9092 --topic topic_name 注:可选 --from-beginning:会把主题中以往所有的数据都读取出来 --whitelist '.*' :消费所有的topic --property print.key=true:显示key进行消费 --partition 0:指定分区消费 --offset:指定起始偏移量消费

查看某个Topic的详情

kafka-topics --zookeeper xxxxxx:2181 --describe --topic topic_name

修改分区数

kafka-topics --zookeeper xxxxxx:2181 --alter --topic topic_name --partitions 6

查看某个消费者组信息

kafka-consumer-groups --bootstrap-server xxxxxx:9092 --describe --group group_name

删除消费者组

kafka-consumer-groups --bootstrap-server xxxxxx:9092 ---delete --group group_name

重置offset

kafka-consumer-groups --bootstrap-server xxxxxx:9092 --group group_name --reset-offsets --all-topics --to-latest --execute

leader重新选举

指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举

kafka-leader-election --bootstrap-server xxxxxx:9092 --topic topic_name --election-type PREFERRED --partition 0

所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举

kafka-leader-election --bootstrap-server xxxxxx:9092 --election-type preferred --all-topic-partitions

查询kafka版本信息

kafka-configs --bootstrap-server xxxxxx:9092 --describe --version

增删改配置

功能说明 参数
选择类型 –entity-type (topics/clients/users/brokers/broker- loggers)
类型名称 –entity-name
删除配置 –delete-config k1=v1,k2=v2
添加/修改配置 –add-config k1,k2

topic添加/修改动态配置

kafka-configs --bootstrap-server xxxxxx:9092 --alter --entity-type topics --entity-name topic_name --add-config file.delete.delay.ms=222222,retention.ms=999999

topic删除动态配置

kafka-configs --bootstrap-server xxxxxx:9092 --alter --entity-type topics --entity-name topic_name --delete-config file.delete.delay.ms,retention.ms

持续批量拉取消息

单次最大消费10条消息(不加参数意为持续消费)

kafka-verifiable-consumer --bootstrap-server xxxxxx:9092 --group group_name --topic topic_name --max-messages 10

删除指定分区的消息

删除指定topic的某个分区的消息删除至offset为1024

json文件offset-json-file.json

{
    "partitions": [
        {
            "topic": "topic_name",
            "partition": 0,
            "offset": 1024
        }
    ],
    "version": 1
}
kafka-delete-records --bootstrap-server xxxxxx:9092 --offset-json-file offset-json-file.json

查看Broker磁盘信息

查询指定topic磁盘信息

kafka-log-dirs --bootstrap-server xxxxxx:9090 --describe --topic-list topic1,topic2

查询指定Broker磁盘信息

kafka-log-dirs --bootstrap-server xxxxxx:9090 --describe --topic-list topic1 --broker-list 0
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论