暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka broker -- broker 的初始化过程

我的IT技术路 2021-10-20
1039

消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQkafkarocketMQActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0

 

Broker概述

 

在之前的文章中,我们介绍了kafka的生产者、消费者相关源码,从本文开始我们主要描述kafka broker相关的原理和源码。Broker主要实现的功能有消息存储,集群控制,消费者管理,请求处理等一系列功能。其中消息存储是在log包中;集群控制是在controller包中;消费者分配在coordinator包中;底层的网络通信层在network包中;2.8版本还使用了自写的raft协议替换zookeeper,所以zk包和raft包都是分别和zk通信和raft节点通信的包;API包中是不同版本的API信息,这些都是broker源码中重要的内容。如下图是各个模块的工作关联,后续会针对每个模块详细说明。

 


 

 

Broker的启动

下面我们先下载2.8.0的kafka版本到本地。因为2.8.0存在两种启动方式:zookeeper和raft。我们先看下zookeeper的启动方式。

Zookeeper模式:

1) 修改zookeeper配置(zookeeper.properties),启动好zookeeper。

 

    #配置zk的日志地址(这个日志包括持久化的日志)
    dataDir=G:\\learn\\kafka_2.12-2.8.0\\kafka_2.12-2.8.0\\log\\zookeeper
    # zk的通信端口
    clientPort=2181
    # 客服端最大连接数 0代表不限制
    maxClientCnxns=0


     

    2) 修改broker的配置(server.properties)启动kafka。

     

      # The id of the broker.
      broker.id=0
      # The address the socket server listens on.
      listeners=PLAINTEXT://localhost:9092
      # The number of threads that the server uses for receiving requests from the network and sending responses to the network
      num.network.threads=3
      # The number of threads that the server uses for processing requests, which may include disk I/O
      num.io.threads=8
      # The send buffer (SO_SNDBUF) used by the socket server
      socket.send.buffer.bytes=102400
      # The receive buffer (SO_RCVBUF) used by the socket server
      socket.receive.buffer.bytes=102400
      # The maximum size of a request that the socket server will accept (protection against OOM)
      socket.request.max.bytes=104857600
      # A comma separated list of directories under which to store log files
      log.dirs=G:\\learn\\kafka_2.12-2.8.0\\kafka_2.12-2.8.0\\log\\kafka-logs
      # The default number of log partitions per topic.
      num.partitions=1
      # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
      num.recovery.threads.per.data.dir=1
      # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      # The minimum age of a log file to be eligible for deletion due to age
      log.retention.hours=168
      # The maximum size of a log segment file. When this size is reached a new log segment will be created.
      log.segment.bytes=1073741824
      # The interval at which log segments are checked to see if they can be deleted according
      # to the retention policies
      log.retention.check.interval.ms=300000
      #zk
      zookeeper.connect=localhost:2181
      zookeeper.connection.timeout.ms=18000
      # Group Coordinator Settings
      group.initial.rebalance.delay.ms=0


       

      Raft模式启动:

      1) 生成集群id

        bin/kafka-storage.sh random-uuid 、

        --返回集群id:Hfc5JlmbTWKqLOhfJSCVwS


        2) 格式化目录

          bin/kafka-storage.sh format -t Hfc5JlmbTWKqLOhfJSCVwS -c config/kraft/server.properties


          3) 启动kafka:

            bin/kafka-server-start.sh config/kraft/server.properties


             

            如果是多节点安装,确保每个节点使用的集群ID是相同的Kraft的server配置基本和上面server的配置类似(2.8.0的raft协议还处于测试阶段,暂时不推荐在生产上使用)。

             

             

            Broker的初始化

            Broker的启动分为两种模式:zk模式或者raft模式,在初始化的时候通过process.roles配置进行判断。如果存在该配置的话就是raft模式,否则就是zk模式。Kafka的Broker使用的是Scala语言编写的,在一定程度上和java语言有点不同,下面我们看下整个的初始化过程。

             

              def main(args: Array[String]): Unit = {
              //...
              //获取配置
              val serverProps = getPropsFromArgs(args)
              //初始化broker对象
              val server = buildServer(serverProps)
              //启动服务
              server.startup()
              //...
              }
              private def buildServer(props: Properties): Server = {
              val config = KafkaConfig.fromProps(props, false)
              //zk的方式启动
              if (config.requiresZookeeper) {
              new KafkaServer(
              config,
              Time.SYSTEM,
              threadNamePrefix = None,
              enableForwarding = false
              )
              } else {
              //raft方式启动
              new KafkaRaftServer(
              config,
              Time.SYSTEM,
              threadNamePrefix = None
              )
              }
              }


               

              从上面的代码可以看到核心的启动在kafkaServer类和KafkaRaftServer类,这两个分别代表zk模式和raft模式启动。

               

              KafkaServer 启动过程分析

              启动过程主要是因为要初始化很多东西,比如上面提到的日志管理,协调者,集群控制器,监控,网络通信,zk连接等等。这些都会在一开始服务启动的时候初始化,网络处理会开始处理相关请求。

                override def startup(): Unit = {
                try {
                //...
                //cas判断是否可以启动
                val canStartup = isStartingUp.compareAndSet(false, true)
                if (canStartup) {
                //设置状态
                brokerState.set(BrokerState.STARTING)
                //初始化zk连接,并监听
                initZkClient(time)
                configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
                _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
                if (config.isFeatureVersioningSupported) {
                _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
                }
                //创建集群id
                _clusterId = getOrGenerateClusterId(zkClient)

                /* 加载元数据,从关闭中恢复的时候使用 */
                val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) =
                BrokerMetadataCheckpoint.getBrokerMetadataAndOfflineDirs(config.logDirs, ignoreMissing = true)
                //...
                /* 创建 brokerId */
                config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
                //从zk中动态获取配置
                config.dynamicConfig.initialize(zkClient)

                /* 启动 定时调度,kafka内部的各种定时机制 */
                kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
                kafkaScheduler.startup()

                /* 监控服务 */
                kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
                //...
                logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

                /*启动日志管理服务 */
                logManager = LogManager(config, initialOfflineDirs,
                new ZkConfigRepository(new AdminZkClient(zkClient)),
                kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, config.usesTopicId)
                brokerState.set(BrokerState.RECOVERY)
                logManager.startup(zkClient.getAllTopicsInCluster())




                //创建socket服务并启动
                metadataCache = MetadataCache.zkMetadataCache(config.brokerId)
                tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
                credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

                if (enableForwarding) {
                this.forwardingManager = Some(ForwardingManager(
                config,
                metadataCache,
                time,
                metrics,
                threadNamePrefix
                ))
                forwardingManager.foreach(_.start())
                }

                val apiVersionManager = ApiVersionManager(
                ListenerType.ZK_BROKER,
                config,
                forwardingManager,
                brokerFeatures,
                featureCache
                )
                socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
                socketServer.startup(startProcessingRequests = false)

                /* 启动副本管理器 */
                alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported) {
                AlterIsrManager(
                config = config,
                metadataCache = metadataCache,
                scheduler = kafkaScheduler,
                time = time,
                metrics = metrics,
                threadNamePrefix = threadNamePrefix,
                brokerEpochSupplier = () => kafkaController.brokerEpoch,
                config.brokerId
                )
                } else {
                AlterIsrManager(kafkaScheduler, time, zkClient)
                }
                alterIsrManager.start()

                replicaManager = createReplicaManager(isShuttingDown)
                replicaManager.startup()
                //注册到zK上
                val brokerInfo = createBrokerInfo
                val brokerEpoch = zkClient.registerBroker(brokerInfo)

                // 检查zk上是否存在broker信息
                checkpointBrokerMetadata(ZkMetaProperties(clusterId, config.brokerId))

                /* 启动 token 管理器 */
                tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
                tokenManager.startup()

                /* 启动 kafka 集群控制器 */
                kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
                kafkaController.startup()

                adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient)

                /* 启动 消费者集群协调者 */
                groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
                groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))

                /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
                // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
                transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
                () => new ProducerIdManager(config.brokerId, zkClient), metrics, metadataCache, Time.SYSTEM)
                transactionCoordinator.startup(
                () => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))

                /* 启动自动创建topic管理器 */
                this.autoTopicCreationManager = AutoTopicCreationManager(
                config,
                metadataCache,
                time,
                metrics,
                threadNamePrefix,
                Some(adminManager),
                Some(kafkaController),
                groupCoordinator,
                transactionCoordinator,
                enableForwarding
                )
                autoTopicCreationManager.start()

                /* 认证服务.*/
                authorizer = config.authorizer
                authorizer.foreach(_.configure(config.originals))
                val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
                case Some(authZ) =>
                authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
                ep -> cs.toCompletableFuture
                }
                case None =>
                brokerInfo.broker.endPoints.map { ep =>
                ep.toJava -> CompletableFuture.completedFuture[Void](null)
                }.toMap
                }




                val fetchManager = new FetchManager(Time.SYSTEM,
                new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
                KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

                /* 启动 处理请求服务 */
                val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
                dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
                autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
                fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)

                dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
                config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

                socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
                controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
                autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
                fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)

                controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
                1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
                }

                Mx4jLoader.maybeLoad()

                /* 添加配置 */
                config.dynamicConfig.addReconfigurables(this)
                dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers),
                ConfigType.Ip -> new IpConfigHandler(socketServer.connectionQuotas))

                // * 启动动态配置管理服务,并监听zk */
                dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
                dynamicConfigManager.startup()
                //socket开始处理请求
                socketServer.startProcessingRequests(authorizerFutures)
                //设置启动成功后的状态
                brokerState.set(BrokerState.RUNNING)
                shutdownLatch = new CountDownLatch(1)
                startupComplete.set(true)
                isStartingUp.set(false)
                AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
                info("started")
                }
                }
                catch {
                //...失败抛出异常
                }
                }


                 

                KafkaRaftServer的启动过程分析

                Raft的启动和zk的启动上有部分类似,核心的区别在于zk模式是初始化zk的客服端,并且开启监听数据等。而raft模式是构建kafka-raft集群,并将元数据放入raft中,所以raft启动方式是要启动raft连接客服端替换zk的客服端。

                  override def startup(): Unit = {
                  Mx4jLoader.maybeLoad()
                  //启动raft管理器
                  raftManager.startup()
                  //协调者启动
                  controller.foreach(_.startup())
                  //broker启动
                  broker.foreach(_.startup())
                  AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
                  info(KafkaBroker.STARTED_MESSAGE)
                  }
                  def startup(): Unit = {
                  // raft启动就是启动处理线程和channel
                  //...
                  netChannel.start()
                  raftIoThread.start()
                  }
                  def startup(): Unit = {
                  if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
                  try {
                  //...
                  val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
                  case Some(authZ) =>
                  // It would be nice to remove some of the broker-specific assumptions from
                  // AuthorizerServerInfo, such as the assumption that there is an inter-broker
                  // listener, or that ID is named brokerId.
                  val controllerAuthorizerInfo = ServerInfo(
                  new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0))
                  authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
                  ep -> cs.toCompletableFuture
                  }.toMap
                  case None =>
                  javaListeners.asScala.map {
                  ep => ep -> CompletableFuture.completedFuture[Void](null)
                  }.toMap
                  }
                  //启动APImanager
                  val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)

                  tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
                  credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
                  socketServer = new SocketServer(config,
                  metrics,
                  time,
                  credentialProvider,
                  apiVersionManager)
                  socketServer.startup(startProcessingRequests = false, controlPlaneListener = None, config.controllerListeners)
                  socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
                  config.controllerListeners.head.listenerName))

                  val configDefs = Map(ConfigResource.Type.BROKER -> KafkaConfig.configDef,
                  ConfigResource.Type.TOPIC -> LogConfig.configDefCopy).asJava
                  val threadNamePrefixAsString = threadNamePrefix.getOrElse("")
                  //创建controller
                  controller = new QuorumController.Builder(config.nodeId).
                  setTime(time).
                  setThreadNamePrefix(threadNamePrefixAsString).
                  setConfigDefs(configDefs).
                  setLogManager(metaLogManager).
                  setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
                  setDefaultNumPartitions(config.numPartitions.intValue()).
                  setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
                  TimeUnit.MILLISECONDS)).
                  setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
                  build()


                  quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
                  val controllerNodes =
                  RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
                  controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
                  authorizer,
                  quotaManagers,
                  time,
                  supportedFeatures,
                  controller,
                  raftManager,
                  config,
                  metaProperties,
                  controllerNodes.toSeq,
                  apiVersionManager)
                  controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
                  socketServer.dataPlaneRequestChannel,
                  controllerApis,
                  time,
                  config.numIoThreads,
                  s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
                  SocketServer.DataPlaneThreadPrefix)
                  //启动处理请求
                  socketServer.startProcessingRequests(authorizerFutures)
                  } catch {
                  //...
                  //异常处理
                  }
                  }


                  Broker的启动和kafkaServer的启动基本类似,这里不一一赘述。

                   

                   

                  Broker的核心属性

                  Broker的属性相对来说,就比较多了,在实际开发过程中,我们可能需要关注的以下几个重点配置,这些设置可能有更大的概率会对集群稳定性产生影响。

                  Server.properties文件下:

                  broker.id这个是保证不同节点在集群中唯一的标识,一般从0开始往上加,0,1,2...

                  listeners:这个是文本传输的方式,常用的是不加密的PLAINTEXT,也支持SSL等加密方式传输。

                  log.dirs: 日志路径,这个是存储消息文件的公共路径地址

                  num.partitions:每个topic默认的创建分区数,后续可以线上手动扩充分区数

                  zookeeper.connectzk的连接地址

                   

                  Kraft的server.properties文件:

                  process.roles:启动进程的角色,一般是启动broker,controller两者

                  node.id:扮演角色的节点id,这个和broker.id功能类似

                  controller.quorum.voterscontroller选名的列表,为非空

                  listeners:这个是文本传输的方式,常用的是不加密的PLAINTEXT,也支持SSL等加密方式传输。

                  log.dirs: 日志路径,这个是存储消息文件的公共路径地址

                  num.partitions:每个topic默认的创建分区数,后续可以线上手动扩充分区数

                  更多其他的属性,可以参考官方文档(公众号不允许添加其他的链接,这里就暂时不给地址了)。

                   

                  本文主要介绍了Broker的初始化流程和几个重要的参数,2.8版本之后,kafka实现了自己的分布式一致性协议框架--kraft,其本身是基于raft协议实现的。后续版本应该也会重点完善Kraft的功能,保证kafka可以摆脱zookeeper的限制。

                   

                  本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。


                  文章转载自我的IT技术路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论