概述
对于很早就使用Kafka的朋友,尤其是早期版本(0.10.x之前),在做类似于消费延迟监控功能时,肯定用过至少知道kafka-run-class.sh kafka.tools.GetOffsetShell工具类。不清楚的朋友,可以问下度娘,很多文章都做过介绍。

对于没使用过早期版本Kafka的朋友,可能不知道kafka.tools.GetOffsetShell工具类。但是,我敢肯定,大多数人一定知道kafka.admin.ConsumerGroupCommand工具类,也就是kafka-consumer-groups.sh。ConsumerGroupCommand类除了基本涵盖GetOffsetShell功能外,还有很多其他功能,如:消费组管理、offset重置等等。
其实,后续版本很少有人用GetOffsetShell类,绝大多数使用ConsumerGroupCommand。那么,本文的意义何在?乙方没办法啊,甲方客户非要用GetOffsetShell。甲方的诉求:GetOffsetShel不支持kerberos!!
GetOffsetShell类剖析
该类在Kafka源码中并未提供类似kafka-consumer-groups.sh的脚本工具。
具体使用方法见:

可见:--time -1表示查询最新offset(或者叫最大offset),time -2表示查询最旧offset(或者叫最小offset),--timetimestamp表示查询指定时间的offset。
实际样例:
[root@felixzhbin]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-listlocalhost:9092 --topic test --time -2[root@felixzh bin]# ./kafka-run-class.shkafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1[root@felixzh bin]# ./kafka-run-class.shkafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test--time 1495245100000
该类在源码中路径:
kafka\core\src\main\scala\kafka\tools\GetOffsetShell.scala
功能很简单,就是构造一个KafkaConsumer实例,从kafka查询指定Topic的信息。

如果想支持kerberos,KafkaConsumer使用的config配置项需要传入security.protocol=SASL_PLAINTEXT。很遗憾,该类并未提供可传入的参数项,仅提供参数:
brokerList、topic、partition、time。

GetOffsetShell类改造
思路:增加可传入参数项command-config,将其传入到KafkaConsumer实例config配置。详细代码如下(git diff):

源码编译打包测试
编译打包命令:
$ gradle clean releaseTarGz -x test

因为GetOffsetShell类位于kafka_2.11-2.3.0.jar,只需替换该jar包即可。

测试结果

GetOffsetShell类社区动态
翻了下社区动态,早在2020年就有外国友人提出Kafka优化建议KIP-635(GetOffsetShell: support formultiple topics and consumer configuration override),指出GetOffsetShell类需要支持指定多个topic和指定consumer配置项重写。详见https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override
目前Kafka trunk分支和即将发布的3.0版本,已经有人合入相关实现KAFKA-5235,详见https://github.com/apache/kafka/pull/9430
本文暂不提支持多topic,只说说持指定consumer配置项重写,源码中的实现与上述我的写法类似(话说回来,本来就是很简单的功能,任何人估计都不能写出花来),如下:

---------------------------分割线---------------------------------------------

完整代码详见地址:
https://github.com/apache/kafka/blob/3.0/core/src/main/scala/kafka/tools/GetOffsetShell.scala




