暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

深入理解Kafka服务端之Acceptor线程是如何启动和工作的

大数据记事本 2020-11-21
865
一、场景分析
    上一篇讲到了Kafka网络通信采用了Java NIO的主从Reactor多线程模型,而Acceptor就是Kafka网络通信中很重要的一个线程对象。它通过selector接收客户端的连接请求,并交给某个Processor线程处理。那么这个Acceptor线程对象是如何创建并启动的呢?它又是如何工作的?这篇我们进行详细分析。

二、图示说明

三、源码分析

    1. Acceptor线程是如何创建和启动的
    首先,找到Kafka服务端的入口,即core模块下的src/main/scala/kafka/Kafka.scala类的main()方法。
    def main(args: Array[String]): Unit = {
    try {
    //解析参数,返回一个Properties对象
    val serverProps = getPropsFromArgs(args)
    //根据给定的Properties对象,构建一个kafkaServerStartable对象
          val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    ...
    //TODO 服务端启动核心代码
    kafkaServerStartable.startup()
    kafkaServerStartable.awaitShutdown()
    }
        ...
    }
    这里主要看kafkaServerStartable.startup()方法,里面调用了server.startup()方法。这个方法内容比较多,整个Kafka服务端的功能都在里面,我们找到和网络通信相关的代码,如下:
      def startup() {
      ...
      /TODO NIO的服务端,在这个里面创建了acceptor线程和processor线程
          // config就是解析参数获取到的KafkaConfig配置对象
      socketServer = new SocketServer(config, metrics, time, credentialProvider)
      socketServer.startup(startupProcessors = false)
      ...
      }
      socketServer.startup方法的逻辑如下,这里注意传入的startupProcessors 参数为false,所以方法中 if 条件语句中的代码不执行:
        def startup(startupProcessors: Boolean = true) {
        this.synchronized {
            //ConnectionQuotas是更新连接配额统计信息的类
        connectionQuotas = new ConnectionQuotas(config, time)
        //创建并启动处理控制类请求的Acceptor和Processor
        createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
        //创建并启动处理数据类请求的Acceptor和Processor
        // numNetworkThreads:服务端Network线程数,默认为3
        createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
        //传入startupProcessors参数为false,里面代码不执行
        if (startupProcessors) {
        startControlPlaneProcessor()
        startDataPlaneProcessors()
        }
        }
        主要看这个方法中的两行代码:
          createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
          代码a. 创建并启动处理控制类请求的Acceptor和Processor线程
            createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
            代码b. 创建并启动处理数据类请求的Acceptor和Processor线程
                那么这里的控制类请求(Control plane)和数据类请求(Data plane)指什么呢?这里我们先大概了解下:控制类请求指控制器Controller和Broker交互的请求,而数据类请求包括PRODUCER、FETCH等操作数据的请求。区分这两种请求类型主要为了区分请求的优先级。
                目前我们只分析数据类请求的处理,所以重点看上面的代码b,对应的
            createDataPlaneAcceptorsAndProcessors方法如下:
                注意:方法中的config.numNetworkThreads参数就是num.network.threads的值,即3
              private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,//每个服务对应的processor线程数,默认为3
              endpoints: Seq[EndPoint]): Unit = synchronized {
              //在kafka目录的config/server.properties文件中可以配置多个kafka服务
              //比如:
              // hadoop01:9092
              // hadoop01:9093
              // hadoop01:9094
              //那么这里的endpoints就是这多个服务的集合,而endpoint就对应hadoop01:9092,hadoop01:9093...
              //但是一般不会设置多个
              endpoints.foreach { endpoint =>
              //增加一个监听器
              connectionQuotas.addListener(config, endpoint.listenerName)
              //创建Acceptor线程
              val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
              //在Acceptor对象中添加一组processor线程
              addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
              //传入Acceptor对象,启动线程
              KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
              //阻塞线程
              dataPlaneAcceptor.awaitStartup()
              //将Acceptor对象放入集合,通常情况只有一个Acceptor对象
              dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
              info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
              }
              }
              参数中的endpoints是一个Endpoint对象的列表,Endpoint是Kafka中监听器对应的类,我们可以在server.properties配置文件中配置多个服务,这样就会有多个Endpoint对象。当然一般不会配这个参数,所以默认只有1个,我们这里也按1个Endpoint对象来分析。
              a. 创建一个Acceptor线程对象
                val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix
                我们看一下Acceptor的定义:
                  private[kafka] class Acceptor(val endPoint: EndPoint,//kafka broker的连接信息,包括主机名和端口号等
                  val sendBufferSize: Int,//设置发送数据的缓冲大小,默认100k
                  val recvBufferSize: Int,//设置接收数据的缓冲大小,默认100k,如果client和broker端的网络延迟大,建议调大这两个参数
                  brokerId: Int,
                  connectionQuotas: ConnectionQuotas,
                  metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {


                  //创建底层的NIO Selector对象
                  private val nioSelector = NSelector.open()
                  //服务端创建对应的ServerSocketChannel实例,后序会把这个Channel向上一步创建的Selector对象注册
                  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
                  //processor线程数组,Acceptor线程初始化时,会创建processor的线程池
                  private val processors = new ArrayBuffer[Processor]()
                  //标记processors是否启动
                  private val processorsStarted = new AtomicBoolean
                  private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
                  "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
                  ...
                  }

                  从定义看,创建Acceptor线程接收了6个参数,重要的主要是前3个:

                  • endpoint:broker的连接信息,其中包含主机名,端口号等
                  • sendBufferSize:设置发送数据的缓冲大小,默认100k。由broker端参数socket.send.buffer.bytes配置
                  • recvBufferSize:设置接收数据的缓冲大小,默认100k。由broker端参数socket.receive.buffer.bytes配置
                  除此之外,Acceptor还有两个重要的属性:
                  • nioSelector:Java NIO中的Selector对象
                  • processors:Processor线程池。Acceptor在初始化时,会创建对应的Processor线程池,所以Processor线程对象是由Acceptor对象管理的。
                  b. 在Acceptor对象中添加一组Processor线程对象,并启动
                    addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)

                    这个方法的主要作用就是循环创建一组Processor线程对象,循环的次数由参数newProcessorsPerListener决定,即3。然后将这些对象添加到Acceptor对象的processors线程池对象中,并启动这些processor线程。

                      private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
                      val listenerName = endpoint.listenerName
                      val securityProtocol = endpoint.securityProtocol
                      val listenerProcessors = new ArrayBuffer[Processor]()
                      //循环创建processor线程,循环次数为newProcessorsPerListener(3)
                      for (_ <- 0 until newProcessorsPerListener) {
                      val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
                      //放入数组
                      listenerProcessors += processor
                      //在RequestChannel对象中添加processor线程对象
                      dataPlaneRequestChannel.addProcessor(processor)
                      nextProcessorId += 1
                      }
                      //遍历所有的processor,将其放入集合中
                      listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
                      //给Acceptor对象添加一组processor,即3个,并启动processor线程
                      acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
                      }

                      acceptor.addProcessors方法对应的代码如下:

                        private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
                        processors ++= newProcessors//添加一组新的processor线程到processors线程池
                        if (processorsStarted.get)//如果processors线程池已经启动
                        startProcessors(newProcessors, processorThreadPrefix)//启动新添加的processor线程
                        }

                        通过startProcessors方法就启动了这些Processor线程对象。

                        c. 设置Acceptor线程为非后台线程,并启动:
                          KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
                          d. 阻塞Acceptor线程,这里使用了CountDownLatch,是JDK提供的并发流程控制的工具类。初始化时会传入一个int类型的参数,即需要倒数的起始数,每调用一次countDown,这个数就减1,当减为0时,结束等待。
                            dataPlaneAcceptor.awaitStartup()
                            这里的起始数为1,即只要startupLatch变量的countDown()方法被调用,Acceptor线程就会结束等待:
                              private val startupLatch = new CountDownLatch(1)
                              这里startupComplete方法中调用了startupLatch.countDown()
                                protected def startupComplete(): Unit = {
                                // Replace the open latch with a closed one
                                shutdownLatch = new CountDownLatch(1)
                                startupLatch.countDown()
                                }
                                至此,Acceptor线程和其管理的Processor线程都已经创建完成并启动。

                                    2. Acceptor线程是如何工作的

                                    既然Acceptor是一个Runnable接口的实例对象,那么它的工作逻辑一定在run()方法中,代码如下,该方法较长,截取重要部分:
                                  def run() {
                                  //在Selector上注册一个OP_ACCEPT事件
                                  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
                                      //等待Acceptor线程启动完成
                                      startupComplete()
                                  try {
                                  //当前processor在线程池中的下标
                                  var currentProcessorIndex = 0
                                  //服务一直不断地循环
                                  while (isRunning) {
                                        try {
                                  //select方法查看是否有事件注册上来,即获取准备好的SelectionKey的数量
                                  val ready = nioSelector.select(500)
                                  if (ready > 0) {
                                  //获取SelectionKey的集合
                                  val keys = nioSelector.selectedKeys()
                                  val iter = keys.iterator()
                                  //遍历所有的SelectionKey
                                  while (iter.hasNext && isRunning) {
                                  try {
                                  val key = iter.next
                                  //移除SelectionKey
                                  iter.remove()


                                  //如果是连接事件
                                  if (key.isAcceptable) {
                                  //处理连接请求,返回一个SocketChannel
                                                    accept(key).foreach { socketChannel =>
                                  //processors就是处理器的集合,这里先获取空闲处理器的数量
                                  var retriesLeft = synchronized(processors.length)
                                  var processor: Processor = null
                                  //给上面的SocketChannel分配一个Processor线程
                                  do {
                                  //数量减1
                                  retriesLeft -= 1
                                  //分配一个processor,即指定这个链接由哪个processor线程处理
                                                        processor = synchronized {
                                  //从processors中取出一个processor,实现轮询的效果,
                                  currentProcessorIndex = currentProcessorIndex % processors.length
                                  processors(currentProcessorIndex)
                                  }
                                  currentProcessorIndex += 1
                                  //如果分配到Processor,将对应的SocketChannel对象放入newConnections集合
                                  } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                                  }
                                  } else
                                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                                  }
                                                ...
                                  }
                                  a. 在Selector上给ServerSocketChannel对象注册一个OP_ACCEPT事件,用来接收连接请求:
                                    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
                                    b. 等待Acceptor线程启动完成
                                      startupComplete()
                                      c. 不断循环,判断selector上面是否有注册的事件发生,如果有,遍历事件对应的SelectionKey
                                        while (isRunning) {
                                        try {
                                        //select方法查看是否有事件注册上来,即获取准备好的SelectionKey的数量
                                        val ready = nioSelector.select(500)
                                        if (ready > 0) {
                                        //获取SelectionKey的集合
                                        val keys = nioSelector.selectedKeys()
                                        val iter = keys.iterator()
                                        //遍历所有的SelectionKey
                                        while (iter.hasNext && isRunning) {
                                        try {
                                        val key = iter.next
                                        //移除SelectionKey
                                        iter.remove()


                                        d. 如果是连接事件,则调用accept方法,该方法内部就是一系列的Java NIO操作,最终返回一个SocketChannel对象。
                                        e. 给每一个SocketChannel对象分配一个Processor线程对象,分配策略就是通过轮询的方式,根据下标从processors线程池中获取。
                                          if (key.isAcceptable) {
                                          //处理连接请求,返回一个SocketChannel
                                          accept(key).foreach { socketChannel =>
                                          //processors就是处理器的集合,这里先获取空闲处理器的数量
                                          var retriesLeft = synchronized(processors.length)
                                          var processor: Processor = null
                                          //给上面的SocketChannel分配一个Processor线程
                                          do {
                                          //数量减1
                                          retriesLeft -= 1
                                          //分配一个processor,即指定这个链接由哪个processor线程处理
                                          processor = synchronized {
                                          //从processors中取出一个processor,实现轮询的效果,
                                          currentProcessorIndex = currentProcessorIndex % processors.length
                                          processors(currentProcessorIndex)
                                          }
                                          currentProcessorIndex += 1
                                          //如果分配到Processor,将对应的SocketChannel对象放入newConnections集合
                                                } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))  
                                          f. 如果分配到了processor线程对象,就将这个SocketChannel对象放入该processor对象的阻塞队列newConnections中:
                                          assignNewConnection方法内部调用了processor.accept方法:
                                            def accept(socketChannel: SocketChannel,
                                            mayBlock: Boolean,
                                            acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
                                            val accepted = {
                                            //向队列中放入指定的SocketChannel
                                            if (newConnections.offer(socketChannel))
                                            true
                                            //判断是否有可用的Processor线程
                                            else if (mayBlock) {
                                            val startNs = time.nanoseconds
                                            //将SocketChannel放入队列
                                            newConnections.put(socketChannel)
                                            acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)
                                            true
                                            } else
                                            false
                                            }
                                              if (accepted)
                                            wakeup()
                                            accepted
                                            }
                                            通过newConnections.offer(socketChannel)方法将指定的SocketChannel放入processor的阻塞队列newConnections中。
                                            总结:
                                            • Acceptor线程是在服务端程序启动的时候创建和启动的

                                            • 每个Acceptor线程默认管理了3个Processor线程对象

                                            • Acceptor线程通过selector接收客户端的连接事件

                                            • 一旦有连接事件发生,就创建一个SocketChannel对象

                                            • 通过轮询的方式,将创建的SocketChannel对象分配给自己管理的Processor线程

                                            • 每个Processor线程有一个保存SocketChannel的阻塞队列newConnections,该队列的容量为固定的20

                                            • 综上,Acceptor线程做的事比较简单:接收客户端连接请求,创建对应的SocketChannel并轮询交给Processor线程处理

                                            文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                            评论