暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

死磕Kafka源码:GetOffsetShell工具类详解

大数据从业者 2021-08-25
4761

概述

对于很早就使用Kafka的朋友,尤其是早期版本(0.10.x之前),在做类似于消费延迟监控功能时,肯定用过至少知道kafka-run-class.sh kafka.tools.GetOffsetShell工具类。不清楚的朋友,可以问下度娘,很多文章都做过介绍。


对于没使用过早期版本Kafka的朋友,可能不知道kafka.tools.GetOffsetShell工具类。但是,我敢肯定,大多数人一定知道kafka.admin.ConsumerGroupCommand工具类,也就是kafka-consumer-groups.sh。ConsumerGroupCommand类除了基本涵盖GetOffsetShell功能外,还有很多其他功能,如:消费组管理、offset重置等等。

其实,后续版本很少有人用GetOffsetShell类,绝大多数使用ConsumerGroupCommand。那么,本文的意义何在?乙方没办法啊,甲方客户非要用GetOffsetShell。甲方的诉求:GetOffsetShel不支持kerberos!!

 

GetOffsetShell类剖析

该类在Kafka源码中并未提供类似kafka-consumer-groups.sh的脚本工具。

具体使用方法见:


可见:--time -1表示查询最新offset(或者叫最大offset),time -2表示查询最旧offset(或者叫最小offset),--timetimestamp表示查询指定时间的offset。

实际样例:

    [root@felixzhbin]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-listlocalhost:9092 --topic test --time -2


    [root@felixzh bin]# ./kafka-run-class.shkafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1


    [root@felixzh bin]# ./kafka-run-class.shkafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test--time 1495245100000

    该类在源码中路径:

    kafka\core\src\main\scala\kafka\tools\GetOffsetShell.scala

    功能很简单,就是构造一个KafkaConsumer实例,从kafka查询指定Topic的信息。


    如果想支持kerberos,KafkaConsumer使用的config配置项需要传入security.protocol=SASL_PLAINTEXT。很遗憾,该类并未提供可传入的参数项,仅提供参数:

    brokerList、topic、partition、time。



    GetOffsetShell类改造

    思路:增加可传入参数项command-config,将其传入到KafkaConsumer实例config配置。详细代码如下(git diff):


    源码编译打包测试

    编译打包命令:

       $ gradle clean releaseTarGz -x test

      因为GetOffsetShell类位于kafka_2.11-2.3.0.jar,只需替换该jar包即可。

      测试结果


      GetOffsetShell类社区动态

      翻了下社区动态,早在2020年就有外国友人提出Kafka优化建议KIP-635(GetOffsetShell: support formultiple topics and consumer configuration override),指出GetOffsetShell类需要支持指定多个topic和指定consumer配置项重写。详见https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override


      目前Kafka trunk分支和即将发布的3.0版本,已经有人合入相关实现KAFKA-5235,详见https://github.com/apache/kafka/pull/9430


      本文暂不提支持多topic,只说说持指定consumer配置项重写,源码中的实现与上述我的写法类似(话说回来,本来就是很简单的功能,任何人估计都不能写出花来),如下:

      ---------------------------分割线---------------------------------------------




      完整代码详见地址:

      https://github.com/apache/kafka/blob/3.0/core/src/main/scala/kafka/tools/GetOffsetShell.scala

       


      文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论