github.com/confluentinc/confluent-kafka-go/kafka是一个基于librdkafka 通过cgo封装的kafka客户端。回顾下cgo基础,CGO在使用C/C++资源的时候一般有三种形式:直接使用源码;链接静态库;链接动态库。直接使用源码就是在import "C"之前的注释部分包含C代码,或者在当前包中包含C/C++源文件。链接静态库和动态库的方式比较类似,都是通过在LDFLAGS选项指定要链接的库方式链接。因此可以看到confluent-kafka-go就是通过链接库的方式来引入librdkafka的:kafka/build_darwin.go
// #cgo CFLAGS: -DUSE_VENDORED_LIBRDKAFKA -DLIBRDKAFKA_STATICLIB// #cgo LDFLAGS: ${SRCDIR}/librdkafka_vendor/librdkafka_darwin.a -lm -lsasl2 -ldl -lpthreadimport "C"
它引用的就是librdkafka_vendor目录下打包好的链接库。这样的好处是你可以下载固定平台的源码包,但是对于arm芯片的M1,官方没有编译对应的链接库,所以没法使用,于是cgo提供了另一种灵活的使用系统的链接库的方式pkg-config;早期版本的包,或者使用go build -tags dynamic 用的就是这种方式kafka/build_dynamic.go
// #cgo pkg-config: rdkafkaimport "C"
由于我们开发环境有很多工具要运行,比如go test 、golang-cilint等的,每次指定build tags很不方便,我们可以修改下cgo的链接方式,用pkg-config来实现我们的目标。
配置好环境后我们初始化消费者,如果想排查问题打日志可以配置下debug参数。
func getConsumer() *kafka.Consumer {kconsumer, err := kafka.NewConsumer(&kafka.ConfigMap{"metadata.broker.list": groupList,"group.id": groupID,"client.id": hostname,"enable.auto.commit": false,"enable.auto.offset.store": false,"auto.offset.reset": "earliest","enable.partition.eof": true,"debug": "msg", // "generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all",})}
然后订阅topic
func sub(kconsumer *kafka.Consumer) {if err := kconsumer.SubscribeTopics([]string{topic}, nil); err != nil {fmt.Println(err)}}
seek偏移量
func seek(kconsumer *kafka.Consumer) {if err := kconsumer.Assign([]kafka.TopicPartition{{Topic: &topic,Partition: 0,Offset: kafka.Offset(offset),}}); err != nil {fmt.Println(err)}if err := kconsumer.Seek(kafka.TopicPartition{Topic: &topic,Partition: 0,Offset: kafka.Offset(offset),}, 400); err != nil {fmt.Println(err)}}
poll消费消息
func poll(kconsumer *kafka.Consumer){for i := 0; i < count; i++ {ev := kconsumer.Poll(int(400))as, err := kconsumer.Assignment()if len(as) > 0 {fmt.Println(as, err, int64(as[0].Offset))}fmt.Println("\033[31m", ev, "\033[m")if ev != nil {if _, ok := ev.(*kafka.Message); ok {fmt.Println("\033[31moffsets&\033[m:", ev.(*kafka.Message).TopicPartition)}}}}
注意,诡异的offset -1000 就出现在我们打印的offset中。通过多次实验终于发现,这个错误其实和订阅有关系,准确的说是和librdkafka的实现时序有关系。看下我们main函数的三种情况:
func main() {kconsumer := getConsumer()//sub(kconsumer) //情况1http.HandleFunc("/", func(http.ResponseWriter, *http.Request) {sub(kconsumer) 情况2time.Sleep(100 * time.Millisecond) //情况3seek(kconsumer)poll(kconsumer)})fmt.Println(http.ListenAndServe(":8088", nil))}
情况1:在http handler外部订阅topic,seek后poll消息没有异常
情况2:在http handler内部订阅topic,只有程序启动后第一个请求是正常的,第二个请求就会失败,虽然seek不会报错,但是poll消息的时候对应的偏移量是-1000
情况3:在了解到这个问题是由于时序问题引起的滞后,于是猜想,我是不是可以在订阅后sleep一会儿,然后seek呢?通过实验发现,果然是这样。
也就是只有在情况2会出现seek异常,那么是什么原因引起的呢?我们从源码来一一分析。
首先我们大胆怀疑是不是某种配置错误导致了偏移量被自动提交了呢,我们先看下当前的偏移量是什么样子的。
sh kafka-consumer-groups.sh -bootstrap-server localhost:9092 --group test --describeConsumer group 'test' has no active members.GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest test 0 5 9 4 - - -
实验了几次发现,偏移量和提交偏移量并没有变化。查看librdkafka的源码发现只有两个地方会提交偏移量src/rdkafka_assignment.c
static int rd_kafka_assignment_serve_removals(rd_kafka_t *rk) {if (valid_offsets > 0 &&rk->rk_conf.offset_store_method == RD_KAFKA_OFFSET_METHOD_BROKER &&rk->rk_cgrp && rk->rk_conf.enable_auto_commit &&!rd_kafka_destroy_flags_no_consumer_close(rk))rd_kafka_cgrp_assigned_offsets_commit(rk->rk_cgrp, rk->rk_consumer.assignment.removed,rd_false /* use offsets from .removed */,"unassigned partitions");
src/rdkafka_cgrp.c
rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,const rd_kafkap_str_t *group_id,const rd_kafkap_str_t *client_id)if (rk->rk_conf.enable_auto_commit &&rk->rk_conf.auto_commit_interval_ms > 0)rd_kafka_timer_start(&rk->rk_timers, &rkcg->rkcg_offset_commit_tmr,rk->rk_conf.auto_commit_interval_ms * 1000ll,rd_kafka_cgrp_offset_commit_tmr_cb, rkcg);
前者是发生重新指派分区的时候提交,后者是定时提交,两者都判断了变量 enable_auto_commit
自动提交的分区偏移量还需要设置enable_auto_offset_store,也有两个地方
src/rdkafka_offset.c
rd_kafka_resp_err_t rd_kafka_offset_store_stop(rd_kafka_toppar_t *rktp)if (rktp->rktp_rkt->rkt_rk->rk_conf.enable_auto_offset_store &&rktp->rktp_stored_offset == RD_KAFKA_OFFSET_INVALID &&rktp->rktp_offsets_fin.eof_offset > 0)rd_kafka_offset_store0(rktp, rktp->rktp_offsets_fin.eof_offset,rd_true /* force */, RD_DONT_LOCK);
src/rdkafka_queue.c
rd_kafka_op_res_t rd_kafka_op_handle_std(rd_kafka_t *rk,rd_kafka_q_t *rkq,rd_kafka_op_t *rko,int cb_type)rd_kafka_fetch_op_app_prepare(rk, rko);offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;if (rk->rk_conf.enable_auto_offset_store)rd_kafka_offset_store0(rktp, offset,/* force: ignore assignment state */rd_true, RD_DONT_LOCK);z
综上可知并不是因为提交改变了offset导致的不正确。接着我们从创建消费者开始一步步分析下源码:




