Kafka版本:2.5.1
工作中,常采用Kafka作为消息中间件,Kafka是基于日志的消息组件,消费者从Broker上读取数据后,日志数据并不会立即删除,而是保存相应的Offset,以记录消息消费的位置。
实际业务中,有时需要重新定位当前消费组的消费位移以实现数据的重复消费,因Kafka的消息在消费者消费后并未立刻删除,这也为消费位移的重置提供了可能。
线上环境可以使用Kafka提供的脚本对消费位移进行重置。
Kafka安装位置的bin目录下是Kafka相关的各种命令脚本,其中消费者组的位移重试脚本是 kafka-consumer-groups.sh。
该脚本有以下三个主要功能:
展示所有的消费者组列表
对消费者组进行删除操作
重置消费者组的消费位移
结合本文的重心,主要关注如何重置消费者组的消费位移这部分的功能。
进入Kafka的安装目录,执行./bin/kafka-consumer-groups.sh --help,可以显示帮助文档:
./bin/kafka-consumer-groups.sh --help
帮助文档中有一段很清晰的描述如何进行消费组位移的重置:
--reset-offsets Reset offsets of consumer group.Supports one consumer group at thetime, and instances should beinactive Has execution options:--execute to updatethe offsets. Additionally, the --export option is used to export theresults to a CSV format.You must choose one of the followingreset specifications: --to-datetime,--by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current.To define the scope use --all-topicsor --topic. One scope must bespecified unless you use '--from-file'.
如文档描述,进行位移重置的时候,需使用--execute命令,同时必须指定位移的重置规则:
--to-datetime :调整位移到大于等于该时间的最大位移处
--to-earliest :调整位移到最早位移处
--to-latest :调整位移到最晚位移处
--shift-by :调整位移到当前位移 +N 处
--from-file :调整位移到文件内容所指定的位移处
--to-current :调整位移到当前最新提交位移处
消费组位移重置命令参考(重置规则的使用请参考帮助文档):
./bin/kafka-consumer-groups.sh --bootstrap-server host:port--group 消费者组名称 --reset-offsets 重置规则 --execute
实际使用中结合帮助文档,可以快速完成消费者组位移的重置工作,希望感兴趣的小伙伴能够动手操作以加深使用的印象。
因作者能力有限,文中可能出现描述不清的地方,不足之处请指正!希望与你一起共进步^




