Kafka
注: 这里机器我只写一个。命令你们也可使用 ./bin/xx.sh (如:./bin/kafka-topics.sh)
查看当前服务器中的所有topic
kafka-topics --zookeeper xxxxxx:2181 --list --exclude-internal 说明: exclude-internal:排除kafka内部topic 比如: --exclude-internal --topic "test_.*"
创建topic
kafka-topics --zookeeper xxxxxx:2181 --create --replication-factor --partitions 1 --topic topic_name 说明: --topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数
删除topic
注意: 需要server.properties中设置delete.topic.enable=true否则只是标记删除
kafka-topics --zookeeper xxxxxx:2181 --delete --topic topic_name
生产者
kafka-console-producer --broker-list xxxxxx:9092 --topic topic_name 可加:--property parse.key=true(有key消息)
消费者
kafka-console-consumer --bootstrap-server xxxxxx:9092 --topic topic_name 注:可选 --from-beginning:会把主题中以往所有的数据都读取出来 --whitelist '.*' :消费所有的topic --property print.key=true:显示key进行消费 --partition 0:指定分区消费 --offset:指定起始偏移量消费
查看某个Topic的详情
kafka-topics --zookeeper xxxxxx:2181 --describe --topic topic_name
修改分区数
kafka-topics --zookeeper xxxxxx:2181 --alter --topic topic_name --partitions 6
查看某个消费者组信息
kafka-consumer-groups --bootstrap-server xxxxxx:9092 --describe --group group_name
删除消费者组
kafka-consumer-groups --bootstrap-server xxxxxx:9092 ---delete --group group_name
重置offset
kafka-consumer-groups --bootstrap-server xxxxxx:9092 --group group_name --reset-offsets --all-topics --to-latest --execute
leader重新选举
指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举
kafka-leader-election --bootstrap-server xxxxxx:9092 --topic topic_name --election-type PREFERRED --partition 0
所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举
kafka-leader-election --bootstrap-server xxxxxx:9092 --election-type preferred --all-topic-partitions
查询kafka版本信息
kafka-configs --bootstrap-server xxxxxx:9092 --describe --version
增删改配置
| 功能说明 | 参数 |
|---|---|
| 选择类型 | –entity-type (topics/clients/users/brokers/broker- loggers) |
| 类型名称 | –entity-name |
| 删除配置 | –delete-config k1=v1,k2=v2 |
| 添加/修改配置 | –add-config k1,k2 |
topic添加/修改动态配置
kafka-configs --bootstrap-server xxxxxx:9092 --alter --entity-type topics --entity-name topic_name --add-config file.delete.delay.ms=222222,retention.ms=999999
topic删除动态配置
kafka-configs --bootstrap-server xxxxxx:9092 --alter --entity-type topics --entity-name topic_name --delete-config file.delete.delay.ms,retention.ms
持续批量拉取消息
单次最大消费10条消息(不加参数意为持续消费)
kafka-verifiable-consumer --bootstrap-server xxxxxx:9092 --group group_name --topic topic_name --max-messages 10
删除指定分区的消息
删除指定topic的某个分区的消息删除至offset为1024
json文件offset-json-file.json
{
"partitions": [
{
"topic": "topic_name",
"partition": 0,
"offset": 1024
}
],
"version": 1
}
kafka-delete-records --bootstrap-server xxxxxx:9092 --offset-json-file offset-json-file.json
查看Broker磁盘信息
查询指定topic磁盘信息
kafka-log-dirs --bootstrap-server xxxxxx:9090 --describe --topic-list topic1,topic2
查询指定Broker磁盘信息
kafka-log-dirs --bootstrap-server xxxxxx:9090 --describe --topic-list topic1 --broker-list 0
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




