暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深度解惑:Kafka高性能机制全解

大数据从业者 2021-11-05
1390

背景

温度而知新,可以为师矣!重读官方文档,本文深度解析Kafka高性能相关的所有特性:磁盘性能、顺序读写与零拷贝、网络模型等内容。


磁盘性能

Kafka严重依赖filesystem以存储和缓存messages。大家普通认为disk速度很慢。事实上,磁盘的速度比我们想象中慢得多、也快得多,这取决于磁盘的访问模式。关于磁盘性能的关键事实:JBOD配置的RAID-5阵列(7200rpm SATA),线性写入性能达600MB/s,而随机写入性能仅100k/s,差距近6000。现代操作系统对于线性读写进行了大量优化,提供预读和后写技术(以大数据块预取、将小逻辑写分组为大的物理写)。ACM文章有相关讨论,甚至某些场景顺序磁盘访问比随机内存访问速度更快!

现代操作系统尽可能地将所有可用内存用于缓存(PageCache),因为回收内存的性能损失很小。所有磁盘读写都基于PageCache而无法绕开,除非使用direct I/O(数据直接写到磁盘上而非缓存)。有关PageCache更加详细介绍见历史文章:

https://mp.weixin.qq.com/s/nCZHa3Ex8gXtcx8i58GJig

注意filesystem与disk的关系:filesystem中的一个文件的实际大小并不等于文件所占用的disk space。filesystem最小单元为block(1KBytes),disk最小单元为allocation units(4KBytes)。所以,大小为1Byte的文件,在filesystem占用1K,而在disk上占用4K。这也是为什么有些场景下ls命令和du命令查看同一个文件显示大小不同的原因。ls命令查看的是文件信息,而du命令查看的是文件所占磁盘的信息。


顺序读写与零拷贝

如上节所述,排除磁盘访问模式的影响因素,影响效率的因素为频繁小IO操作和大量低效率的字节复制。

首先,频繁小IO操作问题存在于client与server通信时、server持久化操作时。为了避免这种情况,Kafka协议抽象出message set消息集,抽象消息集将消息进行分组分批处理,分摊网络开销而不是一次发送一条消息。Server将消息块一次性追加到Log文件,消费者一次性获取大的线性块。批处理会导致更大网络数据包、更大顺序磁盘操作、连续内存块等。这种简单的优化可以产生几个数量级的加速。

其次,为了避免大量低效率的字节复制,Kafka的生产者、服务端、消费者使用相同的标准化二进制消息格式(因此数据块来回传输时无需修改)。服务端broker以文件目录维护消息集,每个文件由一系列消息集填充,这些消息集与生产者和消费者使用相同的格式。这种通用格式有利于持久化数据文件的网络传输。

现代unix操作系统高度优化支持将数据从PageCache传输到socket,Linux系统通过调用sendfile可以实现将数据直接从PageCache数据发送到网络socket,一次数据拷贝以避免常规方式四次拷贝(磁盘->内核PageCache->user buffer->内核PageCache->socket)的低效率。这种技术就称之为零拷贝。

结合PageCache和零拷贝技术,Kafka集群消费者没有堆积时,数据直接从PageCache读取而不用读取磁盘。当然,如果消费者消费历史数据,就会先从磁盘读取到PageCache,然后再从PageCache读取到socket。

Java类库在Linux和UNIX系统上支持通过java.nio.channels.FileChannel类的transferTo()方法实现零拷贝(zero copy)技术。详细内容参见: https://developer.ibm.com/articles/j-zerocopy/


网络模型(Reactor架构)

Kafka使用Java NIO类库实现底层网络传输层和持久化数据到本地磁盘文件。NIO即Non-blocking IO,可以用来替代标准Java IO。

NIO提供了与标准IO不同的IO工作方式:

    1.标准IO基于流(字节流和字符流)进行操作,而NIO基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区或者从缓冲区写入到通道。
    2.NIO可以非阻塞的使用IO,如:当线程从通道读取数据到缓冲区时,线程还是可以做其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
    3.NIO引入了选择器(selector)的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。

    NIO通道(Channel)分为四种:

      1.FileChannel :读写文件数据。
      2.DatagramChannel:读写UDP网络数据。
      3.SocketChannel:读写TCP网络数据。
      4.ServerSocketChannel:监听TCP连接,类似于Web服务器。对于新建连接都会创建一个对应的SocketChannel


      JavaNIO可以详见系列文章: http://tutorials.jenkov.com/java-nio/index.html

      Kafka网络模型使用到NIO Channel包括FileChannel、SocketChannel、ServerSocketChannel。结合源码中Kafka启动流程可以看到相关内容:

      启动类kafka.Kafka关键源码:

        val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
        kafkaServerStartable.startup()

        对应服务类kafkaServerStartable关键源码:

          private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)
          server.startup()

          对应服务类KafkaServer关键源码:

            socketServer = new SocketServer(config, metrics, time, credentialProvider)
            socketServer.startup(startProcessingRequests = false)

            对应服务类SocketServer关键源码:

              def startup(startProcessingRequests: Boolean = true): Unit = {
              this.synchronized {
              connectionQuotas = new ConnectionQuotas(config, time, metrics)
              createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
              createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
              if (startProcessingRequests) {
              this.startProcessingRequests()
              }
              }
              }

              详细说明SocketServer服务类,主要处理broker之间以及broker与client之间的新建连接、处理请求、返回处理结果等。Kafka支持两种类型的request planes(data-plane、control-plane)。

              data-plane处理clients和brokers之间的请求(注意:broker相对其他broker也是client)。线程模型是:一个listener(一个broker可能有多个listener)对应一个 Acceptor线程(处理连接请求,分配processor)、该Acceptor线程对应N个Processor线程(每个processor有自己的selector,从sockets读取请求数据)、M个Handler线程(处理Processors线程读取的请求数据,并返回处理结果给相应的processor线程)。

              control-plane处理controller请求,这是可选的,通过配置control.plane.listener.name指定,如果不指定,controller请求将交由data-plane处理。线程模型是:一个Acceptor线程、一个Processor线程、一个Handler线程,线程功能与data-plane相应线程功能相同。

              上述data-plane中Processors线程数由配置num.network.threads指定,见源码

                createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)

                将Processors线程关联到Acceptor线程和dataPlaneRequestChannel通道,见源码

                  private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
                  val listenerName = endpoint.listenerName
                  val securityProtocol = endpoint.securityProtocol
                  val listenerProcessors = new ArrayBuffer[Processor]()
                  for (_ <- 0 until newProcessorsPerListener) {
                  val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
                  listenerProcessors += processor
                  dataPlaneRequestChannel.addProcessor(processor)
                  nextProcessorId += 1
                  }
                  listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
                  acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
                  }

                  上述data-plane中Handler线程数由配置num.io.threads指定,见源码

                    dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
                    config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

                    KafkaRequestHandlerPool类内部会关联dataPlaneRequestChannel与Handlers线程,见源码

                       this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
                      val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
                      for (i <- 0 until numThreads) {
                      createHandler(i)
                      }




                      def createHandler(id: Int): Unit = synchronized {
                      runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
                      KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
                      }

                      至此,一个Acceptor线程(即Acceptor类)、N个Processors线程(即Processor类)、M个Handlers线程(即KafkaRequestHandler类)构建并关联完成!

                      注意:Kafka服务启动的所有线程均是通过一个封装的KafkaThread线程包装类实现,毫无疑问,该类继承了Thread类。

                        /**
                        * A wrapper for Thread that sets things up nicely
                        */
                        public class KafkaThread extends Thread {


                        private final Logger log = LoggerFactory.getLogger(getClass());

                        public static KafkaThread daemon(final String name, Runnable runnable) {
                        return new KafkaThread(name, runnable, true);
                        }


                        public static KafkaThread nonDaemon(final String name, Runnable runnable) {
                        return new KafkaThread(name, runnable, false);
                        }


                        public KafkaThread(final String name, boolean daemon) {
                        super(name);
                        configureThread(name, daemon);
                        }


                        public KafkaThread(final String name, Runnable runnable, boolean daemon) {
                        super(runnable, name);
                        configureThread(name, daemon);
                        }


                        private void configureThread(final String name, boolean daemon) {
                        setDaemon(daemon);
                        setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e));
                        }


                        }


                        文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论