暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka客户端之如何获取集群元数据

大数据记事本 2020-11-11
1156

一、场景说明

    当我们初始化一个Kafka生产者后(初始化流程可以查看《Kafka源码解析之生产者初始化流程》),通过该生产者将封装好的消息发送出去,示例代码仍然参考example模块下的Producer.java:
    public class Producer extends Thread {
    public Producer(String topic, Boolean isAsync) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    //初始化KafkaProducer
    producer = new KafkaProducer<>(props);
    this.topic = topic;
    this.isAsync = isAsync;
    }


    public void run() {
    int messageNo = 1;
    while (true) {
    String messageStr = "Message_" + messageNo;
    long startTime = System.currentTimeMillis();
    if (isAsync) { // Send asynchronously
    //异步发送消息
    producer.send(new ProducerRecord<>(topic,
    messageNo,
    messageStr), new DemoCallBack(startTime, messageNo, messageStr));
                } 
                ...
    ++messageNo;
    }
    }
    }    
        发送消息的过程中就必然需要集群的元数据,比如指定的Topic有多少分区,每个分区的Leader副本在哪个节点上等等。那么客户端是如何获取集群元数据的呢?下面通过图示+源码的方式详细分析一下这个流程。

    二、获取元数据流程图

        这里重点分析主线程和Sender线程的切换,集群元数据的获取是通过Sender线程完成的。

    三、过程源码解析

    1、KafkaProducer通过send方法最终调用了doSend方法,生产者生产的消息就是通过这个方法发送给客户端的,截取部分代码如下:

      private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
      TopicPartition tp = null;
      try {
      throwIfProducerClosed();
      // first make sure the metadata for the topic is available
      ClusterAndWaitTime clusterAndWaitTime;
      try {
      //TODO 步骤一:同步等待获取元数据(在这个方法中主线程会阻塞,并唤醒sender线程)
      clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
      } catch (KafkaException e) {
      if (metadata.isClosed())
      throw new KafkaException("Producer closed while send in progress", e);
      throw e;
      }
      }
          ...
      }            

      doSend方法中通过调用waitOnMetadata来同步等待获取元数据,会阻塞主线程,直到获取到元数据或者超时,该方法代码如下:

        private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
            //拉取元数据,第一次进来是拉取不到的,metadata只有配置的bootstrap.servers信息
        Cluster cluster = metadata.fetch();


        if (cluster.invalidTopics().contains(topic))
        throw new InvalidTopicException(topic);
        //将指定的topic添加到元数据
        metadata.add(topic);
        //获取指定topic的分区数,第一次进来是没有topic的信息的,所以也获取不到分区数,partitionsCount为null
        Integer partitionsCount = cluster.partitionCountForTopic(topic);


        if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);


        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
            long elapsed;
        //循环,直到获取集群元数据,或者超时
        do {
        if (partition != null) {
        log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
        } else {
        log.trace("Requesting metadata update for topic {}.", topic);
        }
        metadata.add(topic);
        //获取当前元数据的版本号
        int version = metadata.requestUpdate();
        //唤醒sender线程
        sender.wakeup();
        try {
        //阻塞等待元数据
        //退出等待有两种方式:1)达到等待的时间;2)被其他线程唤醒了
        metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
        // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
        throw new TimeoutException(
        String.format("Topic %s not present in metadata after %d ms.",
        topic, maxWaitMs));
        }
        //如果线程被唤醒,说明已经可以获取到元数据了
        cluster = metadata.fetch();
        //花费的时间
        elapsed = time.milliseconds() - begin;
        //如果超时,抛异常
        if (elapsed >= maxWaitMs) {
        throw new TimeoutException(partitionsCount == null ?
        String.format("Topic %s not present in metadata after %d ms.",
        topic, maxWaitMs) :
        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
        partition, topic, partitionsCount, maxWaitMs));
        }


        metadata.maybeThrowExceptionForTopic(topic);
        //计算剩余时间
        remainingWaitMs = maxWaitMs - elapsed;
        //获取分区数
        partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));


        return new ClusterAndWaitTime(cluster, elapsed);
        }

        该方法中通过一个do...while循环不断尝试获取元数据,我们看几段重要的代码:

          //获取当前元数据的版本号
          int version = metadata.requestUpdate();

          a .获取当前元数据的版本号,是一个递增的值,客户端每更新一次元数据,就同时更新一次版本号;

            //唤醒sender线程
            sender.wakeup();

            b.唤醒Sender线程,集群的元数据就是通过Sender线程获取到的

              //阻塞等待元数据
              metadata.awaitUpdate(version, remainingWaitMs);

              c.阻塞主线程,等待更新元数据,结束等待的条件有两个:

              • 被其它线程唤醒

              • 达到等待时间

              awaitUpdate方法的代码如下:

                public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
                long currentTimeMs = time.milliseconds();
                //最后期限=当前时间+等待时间
                long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
                time.waitObject(this, () -> {
                // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.
                maybeThrowFatalException();
                //直到最新的版本号>给定的版本号,方法返回
                return updateVersion() > lastVersion || isClosed();
                }, deadlineMs);


                if (isClosed())
                throw new KafkaException("Requested metadata update after close");
                }
                该方法返回的条件是:updateVersion() > lastVersion,即更新后的version大于当前元数据的version。

                2、既然唤醒了Sender线程来获取元数据,那就看一下它的run方法,内部调用了runOnce方法,代码如下:

                  void runOnce() {
                  //前面关于事务的代码先不看
                  ...
                  long currentTimeMs = time.milliseconds();
                  long pollTimeout = sendProducerData(currentTimeMs);
                  //TODO 真正执行网络操作的都是NetworkClient这个组件
                  // 包括发送请求,接收响应,处理响应
                  // 就是通过这个方法拉取的元数据
                  client.poll(pollTimeout, currentTimeMs);
                  }

                  这里client的实现类是NetworkClient,是一个执行网络操作的组件,通过它的poll方法来获取元数据:

                    @Override
                    public List<ClientResponse> poll(long timeout, long now) {
                    //
                    ensureActive();
                    //List<ClientResponse> 如果abortedSends不为空,说明已经连接broker并获取了响应,直接处理
                    //第一次进来abortedSends为空,不走这个分支
                    if (!abortedSends.isEmpty()) {
                    // If there are aborted sends because of unsupported version exceptions or disconnects,
                    // handle them immediately without waiting for Selector#poll.
                    List<ClientResponse> responses = new ArrayList<>();
                    handleAbortedSends(responses);
                    completeResponses(responses);
                    return responses;
                    }
                    //TODO 步骤一:封装一个拉取元数据的请求
                    long metadataTimeout = metadataUpdater.maybeUpdate(now);
                    try {
                    //TODO 步骤二:发送请求,进行复杂的网络操作,这里用的就是java的NIO
                    this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
                    } catch (IOException e) {
                    log.error("Unexpected error during I/O", e);
                    }


                    // process completed actions
                    long updatedNow = this.time.milliseconds();
                    List<ClientResponse> responses = new ArrayList<>();
                    //将请求返回的响应放到responses集合
                    handleCompletedSends(responses, updatedNow);
                    //TODO 步骤三:处理响应,响应里面就会有我们需要的元数据
                    handleCompletedReceives(responses, updatedNow);
                    handleDisconnections(responses, updatedNow);
                    handleConnections();
                    handleInitiateApiVersionRequests(updatedNow);
                    handleTimedOutRequests(responses, updatedNow);
                    completeResponses(responses);


                    return responses;
                    }

                    这个方法主要分三步:

                    • 封装一个拉取元数据的请求

                    • 向服务端发送请求,获取响应

                    • 处理响应,获取响应中的集群元数据信息

                    看一下具体的代码:

                    步骤一:

                      //TODO 步骤一:封装一个拉取元数据的请求
                      long metadataTimeout = metadataUpdater.maybeUpdate(now);

                      这里metadataUpdater的实现类是DefaultMetadataUpdater,是NetworkClient类的一个内部类,其maybeUpdate方法如下:

                        private long maybeUpdate(long now, Node node) {
                        //获取连接的NodeId
                        String nodeConnectionId = node.idString();
                        //判断网络连接是否已经建立好,如果已经建立好,执行下面的代码(第二次进来时网络已经建立好了)
                        //第一次进来,网络连接显然是没有建立好的
                        if (canSendRequest(nodeConnectionId, now)) {
                        Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();
                        this.inProgressRequestVersion = requestAndVersion.requestVersion;
                        //构建一个拉取目标topic元数据的请求
                        MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                        log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                        //发送拉取元数据的请求
                        sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                        return defaultRequestTimeoutMs;
                        }


                            if (isAnyNodeConnecting()) {
                        return reconnectBackoffMs;
                        }
                            //第一次进来由于没有建立好网络连接,走的是这个分支,初始化一个连接
                            if (connectionStates.canConnect(nodeConnectionId, now)) {
                        log.debug("Initialize connection to node {} for sending metadata request", node);
                        //初始化一个到给定节点的网络连接,其实只绑定了OP_CONNECT事件
                        initiateConnect(node, now);
                        return reconnectBackoffMs;
                        }


                        return Long.MAX_VALUE;
                        }

                        当do...while循环第一次走到这个方法时,由于没有和给定节点建立连接,所以会先初始化一个网络连接;第二次进入到这个方法时,会执行下面的代码:

                          if (canSendRequest(nodeConnectionId, now)) {
                          Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();
                          this.inProgressRequestVersion = requestAndVersion.requestVersion;
                          //构建一个拉取目标topic元数据的请求
                          MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                          log.debug("Sending metadata request {} to node {}", metadataRequest, node);
                              //添加拉取元数据的请求
                          sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                          return defaultRequestTimeoutMs;
                          }

                          先构建一个拉取元数据的MetadataRequest请求,然后通过sendInternalMetadataRequest方法将这个请求转为ClientRequest请求

                             void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
                            //创建一个拉取元数据的请求
                            ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
                            //保存要发送的请求
                            doSend(clientRequest, true, now);
                            }

                            然后通过doSend方法将这个请求放到inFlightRequests里面,这里面保存的是已发送但是没有返回响应的请求,默认值最多保存5个请求,然后将这个请求放到发送队列等待Selector的poll方法处理。注意这里的Selector并不是JavaNIO中的那个Selector,而是kafka自己定义的。

                              private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
                              String destination = clientRequest.destination();
                              RequestHeader header = clientRequest.makeHeader(request.version());
                              if (log.isDebugEnabled()) {
                              int latestClientVersion = clientRequest.apiKey().latestVersion();
                              if (header.apiVersion() == latestClientVersion) {
                              log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                              clientRequest.correlationId(), destination);
                              } else {
                              log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                              header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
                              }
                              }
                              Send send = request.toSend(destination, header);
                              InFlightRequest inFlightRequest = new InFlightRequest(
                              clientRequest,
                              header,
                              isInternalRequest,
                              request,
                              send,
                              now);
                              //把这个拉取元数据的请求放到inFlightRequests里面,
                              // 这个里面存储的是已发送请求,但是未返回响应的请求,默认最多5个
                              this.inFlightRequests.add(inFlightRequest);
                              //把请求放到发送队列等待poll方法处理
                              selector.send(send);
                              }

                              步骤二:

                                //TODO 步骤二:发送请求,进行复杂的网络操作
                                this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));

                                poll方法的部分代码如下,这里用的就是Java的NIO,其中nioSelector才是Java中的Selecotr对象:

                                  @Override
                                  public void poll(long timeout) throws IOException {
                                      ...
                                  /* check ready keys */
                                  long startSelect = time.nanoseconds();
                                  //获取已经准备好io的selectionKey(channel)个数
                                  int numReadyKeys = select(timeout);
                                  long endSelect = time.nanoseconds();
                                  this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());


                                  if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
                                  //获取所有准备好的selectionKey
                                  Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
                                  ...
                                  // Poll from channels where the underlying socket has more data
                                  //遍历selectionKey进行处理
                                  pollSelectionKeys(readyKeys, false, endSelect);
                                  // Clear all selected keys so that they are included in the ready count for the next select
                                  readyKeys.clear();
                                          //处理IO操作
                                  pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
                                  immediatelyConnectedKeys.clear();
                                  } else {
                                  madeReadProgressLastPoll = true; //no work is also "progress"
                                  }


                                  long endIo = time.nanoseconds();
                                  this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
                                      ...
                                  }

                                  其中处理IO操作的是pollSelectionKeys方法,截取部分关键代码如下:

                                    @Override
                                    public void poll(long timeout) throws IOException {
                                    ...


                                    /* check ready keys */
                                    long startSelect = time.nanoseconds();
                                    //获取已经准备好io的selectionKey(channel)个数
                                    int numReadyKeys = select(timeout);
                                    long endSelect = time.nanoseconds();
                                    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());


                                    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
                                    //获取所有准备好的selectionKey
                                    Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();


                                    // Poll from channels that have buffered data (but nothing more from the underlying socket)
                                    if (dataInBuffers) {
                                    keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                                    Set<SelectionKey> toPoll = keysWithBufferedRead;
                                    keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
                                    pollSelectionKeys(toPoll, false, endSelect);
                                    }
                                    //遍历selectionKey进行处理
                                    pollSelectionKeys(readyKeys, false, endSelect);
                                    // Clear all selected keys so that they are included in the ready count for the next select
                                    readyKeys.clear();


                                    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
                                    immediatelyConnectedKeys.clear();
                                    } else {
                                    madeReadProgressLastPoll = true; //no work is also "progress"
                                        }
                                    ...
                                    //将stageReceives结构中的NetworkReceive对象放到completeReceive集合中
                                    //stageReceives:Map<KafkaChannel,Deque<NetworkReceive>>
                                    //completeReceive:List<NetworkReceive>
                                    addToCompletedReceives();
                                    }

                                    关键方法有pollSelectionKeysaddToCompletedReceives,其中pollSelectionKeys用来接收服务端返回的响应,并将响应封装成NetworkReceive保存到数据结构中;addToCompletedReceives用来将NetworkReceive对象放到特定的集合中,最后统一进行处理。

                                      void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                                      boolean isImmediatelyConnected,
                                      long currentTimeNanos) {
                                      for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
                                      //获取对应的Kafkachannel
                                      KafkaChannel channel = channel(key);
                                      long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
                                      boolean sendFailed = false;


                                      ...
                                      try {
                                      /* complete any connections that have finished their handshake (either normally or immediately) */
                                                  //如果key对应的是连接事件,走这个分支
                                      if (isImmediatelyConnected || key.isConnectable()) {
                                      /**
                                      * TODO 核心代码
                                      * 最后完成网络连接的代码,如果之前初始化的时候,没有完成网络连接,这里会完成网络连接
                                      */
                                      if (channel.finishConnect()) {
                                      //连接成功后,把brokerId放到连接成功的集合中
                                      this.connected.add(channel.id());
                                      this.sensors.connectionCreated.record();
                                      SocketChannel socketChannel = (SocketChannel) key.channel();
                                      log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                      socketChannel.socket().getReceiveBufferSize(),
                                      socketChannel.socket().getSendBufferSize(),
                                      socketChannel.socket().getSoTimeout(),
                                      channel.id());
                                      } else {
                                      continue;
                                      }
                                      }
                                      ...


                                                  //如果是接收返回的响应,走这个方法
                                      attemptRead(key, channel);


                                                  ...
                                                  
                                      //如果是发送数据,走这个分支
                                      if (channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(
                                      () -> channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos)) {
                                      Send send;
                                      try {
                                      //TODO 往服务端发送消息
                                      //方法里面消息被发送出去,并移除OP_WRITE事件
                                      send = channel.write();
                                      } catch (Exception e) {
                                      sendFailed = true;
                                      throw e;
                                      }
                                      if (send != null) {
                                      //TODO 将响应添加到completedSends
                                      this.completedSends.add(send);
                                      this.sensors.recordBytesSent(channel.id(), send.size());
                                      }
                                      }


                                              ...
                                      }

                                      这里接收返回响应的方法是attemptRead(key, channel),具体的逻辑是:如果KafkaChannel注册的是读事件,就从channel中不断地读取数据,并将NetworkReceive对象添加到stageReceive数据结构中,这是一个Map,key是KafkaChannel,value是一个NetworkReceive队列

                                        private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
                                        //如果是读请求
                                        if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
                                        && !explicitlyMutedChannels.contains(channel)) {
                                        //接收服务端的响应(本质也是一个请求)
                                        //NetworkReceive代表的就是服务端返回来的响应
                                        NetworkReceive networkReceive;
                                        while ((networkReceive = channel.read()) != null) {


                                        madeReadProgressLastPoll = true;
                                        //不断地读取数据,将这个响应放到stagedReceive队列中
                                        addToStagedReceives(channel, networkReceive);
                                        }
                                        if (channel.isMute()) {
                                        outOfMemory = true; //channel has muted itself due to memory pressure.
                                        } else {
                                        madeReadProgressLastPoll = true;
                                        }
                                        }
                                        }
                                        而对于addToCompletedReceives方法,就是把上面的stageReceive数据结构转为List<NetworkReceive>结构。

                                        步骤三:

                                          //TODO 步骤三:处理响应,响应里面就会有我们需要的元数据
                                          handleCompletedReceives(responses, updatedNow);

                                          handleCompletedReceives方法代码如下,如果是关于元数据信息的响应,则执行handleCompletedMetadataResponse方法:

                                            private void handleCompletedReceives(List<ClientResponse> responses, long now) {
                                            //遍历completedReceives集合中的NetworkReceive
                                            for (NetworkReceive receive : this.selector.completedReceives()) {
                                            //获取brokerid
                                            String source = receive.source();
                                            //获取指定broker最后一个没有返回响应的请求
                                            InFlightRequest req = inFlightRequests.completeNext(source);
                                            //解析服务端返回的响应
                                            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                                            throttleTimeSensor, now);
                                            ...
                                            //TODO 如果是关于元数据信息的响应
                                            if (req.isInternalRequest && body instanceof MetadataResponse)
                                            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
                                            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                                            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
                                            else
                                            responses.add(req.completed(body, now));
                                            }
                                            }

                                            截取部分handleCompletedMetadataResponse方法,其主要作用就是更新元数据:

                                              public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
                                              ...
                                              if (response.brokers().isEmpty()) {
                                              log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                                              this.metadata.failedUpdate(now, null);
                                              //如果响应中有broker信息,则更新元数据
                                              } else {
                                              //TODO 更新元数据,注意这里调用的是ProducerMetadata的update方法
                                              // 里面通过notifyALL()方法来唤醒前面等待的线程
                                              this.metadata.update(inProgressRequestVersion, response, now);
                                              }


                                              inProgressRequestVersion = null;
                                              }

                                              注意Metadataupdate方法:

                                                public synchronized void update(int requestVersion, MetadataResponse response, long now) {
                                                Objects.requireNonNull(response, "Metadata response cannot be null");
                                                if (isClosed())
                                                throw new IllegalStateException("Update requested after metadata close");


                                                if (requestVersion == this.requestVersion)
                                                this.needUpdate = false;
                                                else
                                                requestUpdate();


                                                this.lastRefreshMs = now;
                                                this.lastSuccessfulRefreshMs = now;
                                                //更新元数据信息时,会将version值+1
                                                this.updateVersion += 1;


                                                String previousClusterId = cache.cluster().clusterResource().clusterId();


                                                this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now));
                                                //获取响应中的cluster集群元数据
                                                Cluster cluster = cache.cluster();
                                                maybeSetMetadataError(cluster);


                                                this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now));


                                                String newClusterId = cache.cluster().clusterResource().clusterId();
                                                if (!Objects.equals(previousClusterId, newClusterId)) {
                                                log.info("Cluster ID: {}", newClusterId);
                                                }
                                                //更新所有监听集群元数据的对象的元数据信息
                                                clusterResourceListeners.onUpdate(cache.cluster().clusterResource());


                                                log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);
                                                }

                                                关键步骤:this.updateVersion += 1;更新了元数据的version,此时更新后的version > 未更新前的verison,前面阻塞等待元数据的方法就会返回,从而继续执行KafkaProducer.waitOnMetadata方法后面的逻辑:

                                                  //如果线程被唤醒,说明已经可以获取到元数据了
                                                  cluster = metadata.fetch();
                                                  ...
                                                  //重新获取分区数
                                                  partitionsCount = cluster.partitionCountForTopic(topic);
                                                  当重新获取的分区数不为null时,退出do...while循环。

                                                  至此,客户端就获取了集群的元数据信息,继续执行KafkaProducer.doSend方法后面的逻辑,继续向服务端发送数据。


                                                  总结:

                                                  • kafka获取集群元数据是通过Sender线程完成的

                                                  • 在获取集群元数据的过程中,主线程会阻塞,直到拿到元数据或者等待超时

                                                  • NetworkClient是Kafka进行网络操作的组件,拉取集群元数据的过程中进行了封装请求,发送请求和处理响应

                                                  • 元数据的版本号在拉取集群元数据的过程中起到了至关重要的作用

                                                  • Kafka网络通信采用了JavaNIO


                                                  文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                  评论