暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka消费者--SubscriptionState类解析

我的IT技术路 2021-09-16
1237

消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQkafkarocketMQActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0

 

SubscriptionState

在上一文中我们介绍了消费者poll的实现流程,在poll的流程中,第一个需要介绍的核心类是SubscriptionState,该类主要的作用是本地维护消费者订阅的topicpartition,消息返回的监听器,offset值,消费者可以不用每次从服务器拉取消息,类似一个本地缓存的作用;另外消费者操作这些值的变更都集中到这个类中进行维护,下面我们看下这个类的源码,先了解一下围绕这个类的几个组合类关系图:

 


 

上面把几个围绕offset管理的核心类关系图描绘出来,SubscriptionState 包含PartitionStates<TopicPartitionState>TopicPartitionState包含FetchStateFetchState有四种实现FetchStates。下面看下核心成员变量:

 

//消费者订阅的模式

private enum SubscriptionType {

/**

* 无订阅

**/                                        

        NONE, 

/**

* subscribe topic模式,

**/

AUTO_TOPICS,

/**

* subscribe pattern模式

**/

AUTO_PATTERN, 

/**

* assign模式

**/

USER_ASSIGNED

    }

 

    /* 消费者订阅的模式 */

    private SubscriptionType subscriptionType;

 

    /* 如果是订阅+pattern模式,保存正则  */

    private Pattern subscribedPattern;

 

    /* 该消费者订阅的topic 列表*/

    private Set<String> subscription;

 

    /* 整个消费者组监听的topic列表 */

    private Set<String> groupSubscription;

 

    /* 用来存入消费者监听的topic、partition、offset*/

    private final PartitionStates<TopicPartitionState> assignment;

 

    /* 默认的重置策略,这个是初始化的时候写入有最早(EARLIEST)和最晚(LATEST)两种,在清空的时候会置为NONE  */

    private final OffsetResetStrategy defaultResetStrategy;

 

    /* 用户监听消费者发生reblance的监听器 */

    private ConsumerRebalanceListener rebalanceListener;

/* 用来记录消费者分配模式的版本号 */

    private int assignmentId = 0;

 

在上面的参数中,有一个相对关键的参数assignment,这个参数是一个PartitionStates类型,这个类型本身是一个模板类,在服务端也有使用。在客服端这里,传入的类型是PartitionState,这个类是offset相关变更的核心类,基本上客服端上所有和offset操作都是通过该类实现的。在PartitionStates中使用一个linkedHashmap来保存,keytopicPartitionvalueTopicPartitionState。下面我们重点看下TopicPartitionState这个类的实现。先看下该类的成员变量:

//拉取状态,这里有四种状态:INITIALIZING(正在初始化)、FETCHING(拉取中)、AWAIT_RESET(等待重置)、AWAIT_VALIDATION(等待确认)

private FetchState fetchState;

//当前已消费完offset的位置,里面包含当前leader节点、offset值、当前的offsetEpoch(这个类似与版本的概念,代表当前leader的版本号)

private FetchPosition position; 

//offset的高水位值,该值是代表所有isr已经同步的位置

private Long highWatermark; 

//offset的开始位置

private Long logStartOffset; 

//和事务相关的offset,后续在事务中统一说明

private Long lastStableOffset;

//是否用户暂停

private boolean paused;  

//重置策略

private OffsetResetStrategy resetStrategy;  

//下一次重试时间

private Long nextRetryTimeMs;

//首选的副本读取节点值

private Integer preferredReadReplica;

//首选的副本读取超期时间

private Long preferredReadReplicaExpireTimeMs;

 

上面的成员变量我们需要重点注意两个:FetchStateFetchPositionFetchState记录当前的拉取的状态,各种操作的交互都是通过FetchState来实现的。FetchState本身是一个接口类,其提供以下几个方法:

interface FetchState {

//默认的状态转换

        default FetchState transitionTo(FetchState newState) {

            if (validTransitions().contains(newState)) {

                return newState;

            } else {

                return this;

            }

        }

        //返回该状态可以转换到哪些状态

        Collection<FetchState> validTransitions();

       //判断该状态是否需要offset值

        boolean requiresPosition();

        //判断该状态是否存在有效的offset值

        boolean hasValidPosition();

    }

 

FetchState有四种状态,分别是INITIALIZINGFETCHINGAWAIT_RESETAWAIT_VALIDATION。如下是四个类可能存在的转换情况:

 

由于offset值关系到kafka消费的进度,所以在poll流程中会对offset进行详细的check,下面我们看下消费者中哪些涉及到状态的转换过程:

Subscribe流程

指定模式初始化或者协调模式,协调者分配分区信息会进入initializing

 

poll流程中状态

1)元数据发生变化,且当前主节点不存在或者主节点offset不存在,直接进入fetching,否则进入await_validation。同时发起异步校验其他的topic partition的元数据是否变更,如果获取的版本不支持await-validation,直接进入fetching。如果当前状态为await-validation,主节点版本更新,进入await_rest,否则进入fetching状态

2)如果存在initializing状态,拉取offset数据,当前节点不存在或者主节点offset不存在,直接进入fetching;否则 进入await-validation

3)如果在更新offset之后还存在initializing状态,证明服务器的offset已经过期,进入await_reset

4)发送下一次消息消息拉取的时候,会校验元数据是否变更,进入到12)步骤

 

其他的消费者移动offset的流程

1)当消费者主动seekBeginseekEnd的操作,就会进入await_reset状态

2)当消费者主动调用seek操作,就会进入await-validation状态

 

一个正常的注册到消费的状态变化和重置的状态变化如下:

 


 

上面分析消费者fetchState的状态变化,是消费者消费流程中相对复杂的一部分内容。由于这个类的函数很多,关于topic和partition的函数都比较简单,我们这里将相关的函数展示如下:

 

这里我们看下poll中使用到的hasNoSubscriptionOrUserAssignment的代码:

SubscriptionState#hasNoSubscriptionOrUserAssignment

 

public synchronized boolean hasNoSubscriptionOrUserAssignment() {

//判断是否为空的订阅状态

        return this.subscriptionType == SubscriptionType.NONE;

    }

由于和topic相关的函数相对简单,同学可自行解读。剩下的和offset相关的函数,我们列出来如下:

 


其中相对复杂的函数maybeCompleteValidation解析如下:

SubscriptionState#maybeCompleteValidation

//在异步校验该tp是否可以完成wait-validation 到fetching 状态

public synchronized Optional<LogTruncation> maybeCompleteValidation(TopicPartition tp,FetchPosition requestPosition,EpochEndOffset epochEndOffset) {

                                                                        

        //获取该tp的当前状态                                                                

        TopicPartitionState state = assignedStateOrNull(tp);

        if (state == null) {

            log.debug("Skipping completed validation for partition {} which is not currently assigned.", tp);

//如果不是wait-validation,直接返回,不用校验

        } else if (!state.awaitingValidation()) {

            log.debug("Skipping completed validation for partition {} which is no longer expecting validation.", tp);

        } else {

//如果是wait-validation状态,获取当前的offset相关的信息

            SubscriptionState.FetchPosition currentPosition = state.position;

//如果当前保存的position不是请求的position,证明当前position已经被更新,忽略校验

            if (!currentPosition.equals(requestPosition)) {

                log.debug("Skipping completed validation for partition {} since the current position {} " +

                          "no longer matches the position {} when the request was sent",

                          tp, currentPosition, requestPosition);

  //如果获取的offset和 主节点的版本不识别,进入重置状态

            } else if (epochEndOffset.endOffset() == UNDEFINED_EPOCH_OFFSET ||

                        epochEndOffset.leaderEpoch() == UNDEFINED_EPOCH) {

                if (hasDefaultOffsetResetPolicy()) {

                    log.info("Truncation detected for partition {} at offset {}, resetting offset",

                             tp, currentPosition);

                    requestOffsetReset(tp);

                } else {

                    log.warn("Truncation detected for partition {} at offset {}, but no reset policy is set",

                             tp, currentPosition);

                    return Optional.of(new LogTruncation(tp, requestPosition, Optional.empty()));

                }

//如果当前offset比主节点的offset还大,返回到服务端的offset

            } else if (epochEndOffset.endOffset() < currentPosition.offset) {

                if (hasDefaultOffsetResetPolicy()) {

                    SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(

                            epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()),

                            currentPosition.currentLeader);

                    log.info("Truncation detected for partition {} at offset {}, resetting offset to " +

                             "the first offset known to diverge {}", tp, currentPosition, newPosition);

                    state.seekValidated(newPosition);

                } else {

                    OffsetAndMetadata divergentOffset = new OffsetAndMetadata(epochEndOffset.endOffset(),

                        Optional.of(epochEndOffset.leaderEpoch()), null);

                    log.warn("Truncation detected for partition {} at offset {} (the end offset from the " +

                             "broker is {}), but no reset policy is set", tp, currentPosition, divergentOffset);

                    return Optional.of(new LogTruncation(tp, requestPosition, Optional.of(divergentOffset)));

                }

//否则进入fetching状态

            } else {

                state.completeValidation();

            }

        }

 

        return Optional.empty();

    }

上面的主要过程是当发现元数据变更之后,需要异步校验其他offset值,主要是将符合要求的wait-validation状态转换为fetching状态。

 

本文主要介绍了消费者poll流程中的第一个类:SubscriptionState。该类主要是记录消费者的topic、partition和offset相关的值,是一个缓存的作用,其中offset的拉取状态变化时该类的核心分析重点。分析该状态的变化有助于我们理解整个拉取的过程和状态的转换,和我们平时看到的状态机有点不同的是,这里状态转换更加关注该状态可以进入到哪个状态,这是由于除了正常的拉取状态,seek相关操作也会使offset的拉取状态立马发生变更。由于该类的函数众多,且都比较简单,这里就不做详细介绍,感兴趣的同学可以自行学习。

 

本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。


文章转载自我的IT技术路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论