消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQ、kafka、rocketMQ、ActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0。
SubscriptionState
在上一文中我们介绍了消费者poll的实现流程,在poll的流程中,第一个需要介绍的核心类是SubscriptionState,该类主要的作用是本地维护消费者订阅的topic,partition,消息返回的监听器,offset值,消费者可以不用每次从服务器拉取消息,类似一个本地缓存的作用;另外消费者操作这些值的变更都集中到这个类中进行维护,下面我们看下这个类的源码,先了解一下围绕这个类的几个组合类关系图:

上面把几个围绕offset管理的核心类关系图描绘出来,SubscriptionState 包含PartitionStates<TopicPartitionState>,TopicPartitionState包含FetchState,FetchState有四种实现FetchStates。下面看下核心成员变量:
//消费者订阅的模式
private enum SubscriptionType {
/**
* 无订阅
**/
NONE,
/**
* subscribe topic模式,
**/
AUTO_TOPICS,
/**
* subscribe pattern模式
**/
AUTO_PATTERN,
/**
* assign模式
**/
USER_ASSIGNED
}
/* 消费者订阅的模式 */
private SubscriptionType subscriptionType;
/* 如果是订阅+pattern模式,保存正则 */
private Pattern subscribedPattern;
/* 该消费者订阅的topic 列表*/
private Set<String> subscription;
/* 整个消费者组监听的topic列表 */
private Set<String> groupSubscription;
/* 用来存入消费者监听的topic、partition、offset*/
private final PartitionStates<TopicPartitionState> assignment;
/* 默认的重置策略,这个是初始化的时候写入,有最早(EARLIEST)和最晚(LATEST)两种,在清空的时候会置为NONE */
private final OffsetResetStrategy defaultResetStrategy;
/* 用户监听消费者发生reblance的监听器 */
private ConsumerRebalanceListener rebalanceListener;
/* 用来记录消费者分配模式的版本号 */
private int assignmentId = 0;
在上面的参数中,有一个相对关键的参数assignment,这个参数是一个PartitionStates类型,这个类型本身是一个模板类,在服务端也有使用。在客服端这里,传入的类型是PartitionState,这个类是offset相关变更的核心类,基本上客服端上所有和offset操作都是通过该类实现的。在PartitionStates中使用一个linkedHashmap来保存,key是topicPartition,value是TopicPartitionState。下面我们重点看下TopicPartitionState这个类的实现。先看下该类的成员变量:
//拉取状态,这里有四种状态:INITIALIZING(正在初始化)、FETCHING(拉取中)、AWAIT_RESET(等待重置)、AWAIT_VALIDATION(等待确认)
private FetchState fetchState;
//当前已消费完offset的位置,里面包含当前leader节点、offset值、当前的offsetEpoch(这个类似与版本的概念,代表当前leader的版本号)
private FetchPosition position;
//offset的高水位值,该值是代表所有isr已经同步的位置
private Long highWatermark;
//offset的开始位置
private Long logStartOffset;
//和事务相关的offset,后续在事务中统一说明
private Long lastStableOffset;
//是否用户暂停
private boolean paused;
//重置策略
private OffsetResetStrategy resetStrategy;
//下一次重试时间
private Long nextRetryTimeMs;
//首选的副本读取节点值
private Integer preferredReadReplica;
//首选的副本读取超期时间
private Long preferredReadReplicaExpireTimeMs;
上面的成员变量我们需要重点注意两个:FetchState和FetchPosition。FetchState记录当前的拉取的状态,各种操作的交互都是通过FetchState来实现的。FetchState本身是一个接口类,其提供以下几个方法:
interface FetchState {
//默认的状态转换
default FetchState transitionTo(FetchState newState) {
if (validTransitions().contains(newState)) {
return newState;
} else {
return this;
}
}
//返回该状态可以转换到哪些状态
Collection<FetchState> validTransitions();
//判断该状态是否需要offset值
boolean requiresPosition();
//判断该状态是否存在有效的offset值
boolean hasValidPosition();
}
FetchState有四种状态,分别是INITIALIZING,FETCHING,AWAIT_RESET,AWAIT_VALIDATION。如下是四个类可能存在的转换情况:

由于offset值关系到kafka消费的进度,所以在poll流程中会对offset进行详细的check,下面我们看下消费者中哪些涉及到状态的转换过程:
Subscribe流程:
指定模式初始化或者协调模式,协调者分配分区信息会进入initializing
poll流程中状态:
1)元数据发生变化,且当前主节点不存在或者主节点offset不存在,直接进入fetching,否则进入await_validation。同时发起异步校验其他的topic partition的元数据是否变更,如果获取的版本不支持await-validation,直接进入fetching。如果当前状态为await-validation,主节点版本更新,进入await_rest,否则进入fetching状态
2)如果存在initializing状态,拉取offset数据,当前节点不存在或者主节点offset不存在,直接进入fetching;否则 进入await-validation
3)如果在更新offset之后还存在initializing状态,证明服务器的offset已经过期,进入await_reset
4)发送下一次消息消息拉取的时候,会校验元数据是否变更,进入到1)2)步骤
其他的消费者移动offset的流程:
1)当消费者主动seekBegin,seekEnd的操作,就会进入await_reset状态
2)当消费者主动调用seek操作,就会进入await-validation状态
一个正常的注册到消费的状态变化和重置的状态变化如下:

上面分析消费者fetchState的状态变化,是消费者消费流程中相对复杂的一部分内容。由于这个类的函数很多,关于topic和partition的函数都比较简单,我们这里将相关的函数展示如下:

这里我们看下poll中使用到的hasNoSubscriptionOrUserAssignment的代码:
SubscriptionState#hasNoSubscriptionOrUserAssignment:
public synchronized boolean hasNoSubscriptionOrUserAssignment() {
//判断是否为空的订阅状态
return this.subscriptionType == SubscriptionType.NONE;
}
由于和topic相关的函数相对简单,同学可自行解读。剩下的和offset相关的函数,我们列出来如下:

其中相对复杂的函数maybeCompleteValidation解析如下:
SubscriptionState#maybeCompleteValidation:
//在异步校验该tp是否可以完成wait-validation 到fetching 状态
public synchronized Optional<LogTruncation> maybeCompleteValidation(TopicPartition tp,FetchPosition requestPosition,EpochEndOffset epochEndOffset) {
//获取该tp的当前状态
TopicPartitionState state = assignedStateOrNull(tp);
if (state == null) {
log.debug("Skipping completed validation for partition {} which is not currently assigned.", tp);
//如果不是wait-validation,直接返回,不用校验
} else if (!state.awaitingValidation()) {
log.debug("Skipping completed validation for partition {} which is no longer expecting validation.", tp);
} else {
//如果是wait-validation状态,获取当前的offset相关的信息
SubscriptionState.FetchPosition currentPosition = state.position;
//如果当前保存的position不是请求的position,证明当前position已经被更新,忽略校验
if (!currentPosition.equals(requestPosition)) {
log.debug("Skipping completed validation for partition {} since the current position {} " +
"no longer matches the position {} when the request was sent",
tp, currentPosition, requestPosition);
//如果获取的offset和 主节点的版本不识别,进入重置状态
} else if (epochEndOffset.endOffset() == UNDEFINED_EPOCH_OFFSET ||
epochEndOffset.leaderEpoch() == UNDEFINED_EPOCH) {
if (hasDefaultOffsetResetPolicy()) {
log.info("Truncation detected for partition {} at offset {}, resetting offset",
tp, currentPosition);
requestOffsetReset(tp);
} else {
log.warn("Truncation detected for partition {} at offset {}, but no reset policy is set",
tp, currentPosition);
return Optional.of(new LogTruncation(tp, requestPosition, Optional.empty()));
}
//如果当前offset比主节点的offset还大,返回到服务端的offset
} else if (epochEndOffset.endOffset() < currentPosition.offset) {
if (hasDefaultOffsetResetPolicy()) {
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()),
currentPosition.currentLeader);
log.info("Truncation detected for partition {} at offset {}, resetting offset to " +
"the first offset known to diverge {}", tp, currentPosition, newPosition);
state.seekValidated(newPosition);
} else {
OffsetAndMetadata divergentOffset = new OffsetAndMetadata(epochEndOffset.endOffset(),
Optional.of(epochEndOffset.leaderEpoch()), null);
log.warn("Truncation detected for partition {} at offset {} (the end offset from the " +
"broker is {}), but no reset policy is set", tp, currentPosition, divergentOffset);
return Optional.of(new LogTruncation(tp, requestPosition, Optional.of(divergentOffset)));
}
//否则进入fetching状态
} else {
state.completeValidation();
}
}
return Optional.empty();
}
上面的主要过程是当发现元数据变更之后,需要异步校验其他offset值,主要是将符合要求的wait-validation状态转换为fetching状态。
本文主要介绍了消费者poll流程中的第一个类:SubscriptionState。该类主要是记录消费者的topic、partition和offset相关的值,是一个缓存的作用,其中offset的拉取状态变化时该类的核心分析重点。分析该状态的变化有助于我们理解整个拉取的过程和状态的转换,和我们平时看到的状态机有点不同的是,这里状态转换更加关注该状态可以进入到哪个状态,这是由于除了正常的拉取状态,seek相关操作也会使offset的拉取状态立马发生变更。由于该类的函数众多,且都比较简单,这里就不做详细介绍,感兴趣的同学可以自行学习。
本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。




