消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQ、kafka、rocketMQ、ActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0。
Broker概述
在之前的文章中,我们介绍了kafka的生产者、消费者相关源码,从本文开始我们主要描述kafka broker相关的原理和源码。Broker主要实现的功能有消息存储,集群控制,消费者管理,请求处理等一系列功能。其中消息存储是在log包中;集群控制是在controller包中;消费者分配在coordinator包中;底层的网络通信层在network包中;2.8版本还使用了自写的raft协议替换zookeeper,所以zk包和raft包都是分别和zk通信和raft节点通信的包;API包中是不同版本的API信息,这些都是broker源码中重要的内容。如下图是各个模块的工作关联,后续会针对每个模块详细说明。

Broker的启动
下面我们先下载2.8.0的kafka版本到本地。因为2.8.0存在两种启动方式:zookeeper和raft。我们先看下zookeeper的启动方式。
Zookeeper模式:
1) 修改zookeeper配置(zookeeper.properties),启动好zookeeper。
#配置zk的日志地址(这个日志包括持久化的日志)dataDir=G:\\learn\\kafka_2.12-2.8.0\\kafka_2.12-2.8.0\\log\\zookeeper# zk的通信端口clientPort=2181# 客服端最大连接数 0代表不限制maxClientCnxns=0
2) 修改broker的配置(server.properties)启动kafka。
# The id of the broker.broker.id=0# The address the socket server listens on.listeners=PLAINTEXT://localhost:9092# The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600# A comma separated list of directories under which to store log fileslog.dirs=G:\\learn\\kafka_2.12-2.8.0\\kafka_2.12-2.8.0\\log\\kafka-logs# The default number of log partitions per topic.num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.num.recovery.threads.per.data.dir=1# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1# The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000#zkzookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=18000# Group Coordinator Settingsgroup.initial.rebalance.delay.ms=0
Raft模式启动:
1) 生成集群id
bin/kafka-storage.sh random-uuid 、--返回集群id:Hfc5JlmbTWKqLOhfJSCVwS
2) 格式化目录
bin/kafka-storage.sh format -t Hfc5JlmbTWKqLOhfJSCVwS -c config/kraft/server.properties
3) 启动kafka:
bin/kafka-server-start.sh config/kraft/server.properties
如果是多节点安装,确保每个节点使用的集群ID是相同的,Kraft的server配置基本和上面server的配置类似(2.8.0的raft协议还处于测试阶段,暂时不推荐在生产上使用)。
Broker的初始化
Broker的启动分为两种模式:zk模式或者raft模式,在初始化的时候通过process.roles配置进行判断。如果存在该配置的话就是raft模式,否则就是zk模式。Kafka的Broker使用的是Scala语言编写的,在一定程度上和java语言有点不同,下面我们看下整个的初始化过程。
def main(args: Array[String]): Unit = {//...//获取配置val serverProps = getPropsFromArgs(args)//初始化broker对象val server = buildServer(serverProps)//启动服务server.startup()//...}private def buildServer(props: Properties): Server = {val config = KafkaConfig.fromProps(props, false)//zk的方式启动if (config.requiresZookeeper) {new KafkaServer(config,Time.SYSTEM,threadNamePrefix = None,enableForwarding = false)} else {//raft方式启动new KafkaRaftServer(config,Time.SYSTEM,threadNamePrefix = None)}}
从上面的代码可以看到核心的启动在kafkaServer类和KafkaRaftServer类,这两个分别代表zk模式和raft模式启动。
KafkaServer 启动过程分析
启动过程主要是因为要初始化很多东西,比如上面提到的日志管理,协调者,集群控制器,监控,网络通信,zk连接等等。这些都会在一开始服务启动的时候初始化,网络处理会开始处理相关请求。
override def startup(): Unit = {try {//...//cas判断是否可以启动val canStartup = isStartingUp.compareAndSet(false, true)if (canStartup) {//设置状态brokerState.set(BrokerState.STARTING)//初始化zk连接,并监听initZkClient(time)configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))_featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)if (config.isFeatureVersioningSupported) {_featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)}//创建集群id_clusterId = getOrGenerateClusterId(zkClient)/* 加载元数据,从关闭中恢复的时候使用 */val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true)//.../* 创建 brokerId */config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)//从zk中动态获取配置config.dynamicConfig.initialize(zkClient)/* 启动 定时调度,kafka内部的各种定时机制 */kafkaScheduler = new KafkaScheduler(config.backgroundThreads)kafkaScheduler.startup()/* 监控服务 */kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE//...logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)/*启动日志管理服务 */logManager = LogManager(config, initialOfflineDirs,new ZkConfigRepository(new AdminZkClient(zkClient)),kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId)brokerState.set(BrokerState.RECOVERY)logManager.startup(zkClient.getAllTopicsInCluster())//创建socket服务并启动metadataCache = MetadataCache.zkMetadataCache(config.brokerId)tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)if (enableForwarding) {this.forwardingManager = Some(ForwardingManager(config,metadataCache,time,metrics,threadNamePrefix))forwardingManager.foreach(_.start())}val apiVersionManager = ApiVersionManager(ListenerType.ZK_BROKER,config,forwardingManager,brokerFeatures,featureCache)socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)socketServer.startup(startProcessingRequests = false)/* 启动副本管理器 */alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported) {AlterIsrManager(config = config,metadataCache = metadataCache,scheduler = kafkaScheduler,time = time,metrics = metrics,threadNamePrefix = threadNamePrefix,brokerEpochSupplier = () => kafkaController.brokerEpoch,config.brokerId)} else {AlterIsrManager(kafkaScheduler, time, zkClient)}alterIsrManager.start()replicaManager = createReplicaManager(isShuttingDown)replicaManager.startup()//注册到zK上val brokerInfo = createBrokerInfoval brokerEpoch = zkClient.registerBroker(brokerInfo)// 检查zk上是否存在broker信息checkpointBrokerMetadata(ZkMetaProperties(clusterId, config.brokerId))/* 启动 token 管理器 */tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)tokenManager.startup()/* 启动 kafka 集群控制器 */kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)kafkaController.startup()adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)/* 启动 消费者集群协调者 */groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issuetransactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),() => new ProducerIdManager(config.brokerId, zkClient), metrics, metadataCache, Time.SYSTEM)transactionCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))/* 启动自动创建topic管理器 */this.autoTopicCreationManager = AutoTopicCreationManager(config,metadataCache,time,metrics,threadNamePrefix,Some(adminManager),Some(kafkaController),groupCoordinator,transactionCoordinator,enableForwarding)autoTopicCreationManager.start()/* 认证服务.*/authorizer = config.authorizerauthorizer.foreach(_.configure(config.originals))val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {case Some(authZ) =>authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>ep -> cs.toCompletableFuture}case None =>brokerInfo.broker.endPoints.map { ep =>ep.toJava -> CompletableFuture.completedFuture[Void](null)}.toMap}val fetchManager = new FetchManager(Time.SYSTEM,new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))/* 启动 处理请求服务 */val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)}Mx4jLoader.maybeLoad()/* 添加配置 */config.dynamicConfig.addReconfigurables(this)dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers),ConfigType.Ip -> new IpConfigHandler(socketServer.connectionQuotas))// * 启动动态配置管理服务,并监听zk */dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)dynamicConfigManager.startup()//socket开始处理请求socketServer.startProcessingRequests(authorizerFutures)//设置启动成功后的状态brokerState.set(BrokerState.RUNNING)shutdownLatch = new CountDownLatch(1)startupComplete.set(true)isStartingUp.set(false)AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())info("started")}}catch {//...失败抛出异常}}
KafkaRaftServer的启动过程分析
Raft的启动和zk的启动上有部分类似,核心的区别在于zk模式是初始化zk的客服端,并且开启监听数据等。而raft模式是构建kafka-raft集群,并将元数据放入raft中,所以raft启动方式是要启动raft连接客服端替换zk的客服端。
override def startup(): Unit = {Mx4jLoader.maybeLoad()//启动raft管理器raftManager.startup()//协调者启动controller.foreach(_.startup())//broker启动broker.foreach(_.startup())AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())info(KafkaBroker.STARTED_MESSAGE)}def startup(): Unit = {// raft启动就是启动处理线程和channel//...netChannel.start()raftIoThread.start()}def startup(): Unit = {if (!maybeChangeStatus(SHUTDOWN, STARTING)) returntry {//...val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {case Some(authZ) =>// It would be nice to remove some of the broker-specific assumptions from// AuthorizerServerInfo, such as the assumption that there is an inter-broker// listener, or that ID is named brokerId.val controllerAuthorizerInfo = ServerInfo(new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0))authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>ep -> cs.toCompletableFuture}.toMapcase None =>javaListeners.asScala.map {ep => ep -> CompletableFuture.completedFuture[Void](null)}.toMap}//启动APImanagerval apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)socketServer = new SocketServer(config,metrics,time,credentialProvider,apiVersionManager)socketServer.startup(startProcessingRequests = false, controlPlaneListener = None, config.controllerListeners)socketServerFirstBoundPortFuture.complete(socketServer.boundPort(config.controllerListeners.head.listenerName))val configDefs = Map(ConfigResource.Type.BROKER -> KafkaConfig.configDef,ConfigResource.Type.TOPIC -> LogConfig.configDefCopy).asJavaval threadNamePrefixAsString = threadNamePrefix.getOrElse("")//创建controllercontroller = new QuorumController.Builder(config.nodeId).setTime(time).setThreadNamePrefix(threadNamePrefixAsString).setConfigDefs(configDefs).setLogManager(metaLogManager).setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).setDefaultNumPartitions(config.numPartitions.intValue()).setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),TimeUnit.MILLISECONDS)).setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).build()quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))val controllerNodes =RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScalacontrollerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,authorizer,quotaManagers,time,supportedFeatures,controller,raftManager,config,metaProperties,controllerNodes.toSeq,apiVersionManager)controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,socketServer.dataPlaneRequestChannel,controllerApis,time,config.numIoThreads,s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",SocketServer.DataPlaneThreadPrefix)//启动处理请求socketServer.startProcessingRequests(authorizerFutures)} catch {//...//异常处理}}
Broker的启动和kafkaServer的启动基本类似,这里不一一赘述。
Broker的核心属性
Broker的属性相对来说,就比较多了,在实际开发过程中,我们可能需要关注的以下几个重点配置,这些设置可能有更大的概率会对集群稳定性产生影响。
Server.properties文件下:
broker.id:这个是保证不同节点在集群中唯一的标识,一般从0开始往上加,0,1,2...
listeners:这个是文本传输的方式,常用的是不加密的PLAINTEXT,也支持SSL等加密方式传输。
log.dirs: 日志路径,这个是存储消息文件的公共路径地址
num.partitions:每个topic默认的创建分区数,后续可以线上手动扩充分区数
zookeeper.connect:zk的连接地址
Kraft的server.properties文件:
process.roles:启动进程的角色,一般是启动broker,controller两者
node.id:扮演角色的节点id,这个和broker.id功能类似
controller.quorum.voters:controller选名的列表,为非空
listeners:这个是文本传输的方式,常用的是不加密的PLAINTEXT,也支持SSL等加密方式传输。
log.dirs: 日志路径,这个是存储消息文件的公共路径地址
num.partitions:每个topic默认的创建分区数,后续可以线上手动扩充分区数
更多其他的属性,可以参考官方文档(公众号不允许添加其他的链接,这里就暂时不给地址了)。
本文主要介绍了Broker的初始化流程和几个重要的参数,2.8版本之后,kafka实现了自己的分布式一致性协议框架--kraft,其本身是基于raft协议实现的。后续版本应该也会重点完善Kraft的功能,保证kafka可以摆脱zookeeper的限制。
本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。




