服务端收到请求,并返回响应。响应也有两种情况: 正常写入消息,返回正常的响应 发生异常,返回异常的响应 服务端未返回响应,直到超时

Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();...pollSelectionKeys(readyKeys, false, endSelect);
1.1 通过selectionKeys()方法,获取了所有准备好的SelectionKey的集合,然后通过pollSelectionKeys()方法进行处理。该方法主要遍历所有的selectionKey,然后根据注册的不同事件进行处理。这里客户端要读取响应,那么重点看读事件对应的逻辑操作,这里调用了attemptRead方法:
//如果是处理返回的响应,走这个方法attemptRead(key, channel);
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//如果是读请求//hasStagedReceive(channel)判断指定那个channel连接是否有接收到但是还未处理的响应if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)&& !explicitlyMutedChannels.contains(channel)) {//接收服务端的响应(本质也是一个请求)//NetworkReceive代表的就是服务端返回来的响应NetworkReceive networkReceive;//channel.read方法不断读取数据while ((networkReceive = channel.read()) != null) {//madeReadProgressLastPoll用来标记前一次对poll方法的调用是否能够读到已经缓存的数据,即NetworkReceive是否为nullmadeReadProgressLastPoll = true;//不断地读取数据,将这个响应放到stagedReceive队列中addToStagedReceives(channel, networkReceive);}if (channel.isMute()) {//如果channel中内存满了outOfMemory = true; //channel has muted itself due to memory pressure.} else {madeReadProgressLastPoll = true;}}}
在该方法中可以看到,KafkaChannel.read()方法返回NetworkReceive响应对象,通过while循环不断地读取响应,然后通过addToStagedReceives方法将该响应对象放到stagedReceive结构中:
stagedReceive是一个Map结构,存放了节点连接和对应的响应队列:
Map<KafkaChannel, Deque<NetworkReceive>>
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {//channel代表的就是一个网络的连接,一台kafka节点对应一个channel连接/*** 如果stagesReceives结构中已经有指定的channel,那么就拿到对应的响应队列,将NetworkReceive响应对象放进去* 如果没有指定的channel,就创建一个新的队列,然后把响应对象放进去*/if (!stagedReceives.containsKey(channel))stagedReceives.put(channel, new ArrayDeque<>());Deque<NetworkReceive> deque = stagedReceives.get(channel);deque.add(receive);}
addToCompletedReceives();
private void addToCompletedReceives() {//如果stagedReceives不为空,说明已经接收到了响应if (!this.stagedReceives.isEmpty()) {Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();//遍历while (iter.hasNext()) {Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();KafkaChannel channel = entry.getKey();//if (!explicitlyMutedChannels.contains(channel)) {//获取KafkaChanenl对应到NetworkReceive队列Deque<NetworkReceive> deque = entry.getValue();//从这个队列放中取出一个响应对象放到completedReceives集合中addToCompletedReceives(channel, deque);if (deque.isEmpty())iter.remove();}}}}
public List<ClientResponse> poll(long timeout, long now) {...//TODO 步骤一:封装一个拉取元数据的请求long metadataTimeout = metadataUpdater.maybeUpdate(now);try {//TODO 步骤二:发送请求,进行复杂的网络操作,这里用的就是java的NIOthis.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}long updatedNow = this.time.milliseconds();//新建一个响应的集合List<ClientResponse> responses = new ArrayList<>();//这里默认什么都不执行handleCompletedSends(responses, updatedNow);//将构建的ClientResponse放到responses集合中handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);//处理超时的请求handleTimedOutRequests(responses, updatedNow);//TODO 步骤三:处理响应// 如果是获取集群元数据的请求,那么获取的响应中就包含集群元数据completeResponses(responses);return responses;}
执行完selector.poll方法后,新建了一个ClientResponse类型的集合response,然后调用了handleCompletedReceives方法,逻辑如下:
private void handleCompletedReceives(List<ClientResponse> responses, long now) {//遍历completedReceives集合中的响应对象NetworkReceivefor (NetworkReceive receive : this.selector.completedReceives()) {//获取broker idString source = receive.source();//从inFlightRequests集合中对应的inFlightRequest队列中移除已经获取响应的请求InFlightRequest req = inFlightRequests.completeNext(source);//解析服务端返回的请求Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,throttleTimeSensor, now);if (log.isTraceEnabled()) {log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,req.header.apiKey(), req.header.correlationId(), responseStruct);}AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());maybeThrottle(body, req.header.apiVersion(), req.destination, now);//TODO 如果是关于元数据信息的响应if (req.isInternalRequest && body instanceof MetadataResponse)metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);else if (req.isInternalRequest && body instanceof ApiVersionsResponse)handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);else//构建ClientResponse对象并放到responses集合中,body就是响应的内容responses.add(req.completed(body, now));}}
该方法的逻辑是:
遍历上一步获取到的NetworkReceive集合(completedReceives)中的响应对象
获取响应对应的节点,将发往该节点最早的请求从inFlightRequests移除(因为已经收到了响应)
解析服务端返回的响应,因为是二进制的
根据返回的响应构建ClientResponse对象,并存放到responses集合中
handleTimedOutRequests(responses, updatedNow);
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {//获取有超时请求的节点id的集合List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);for (String nodeId : nodeIds) {// close connection to the node//关闭和该节点的连接this.selector.close(nodeId);log.debug("Disconnecting from node {} due to request timeout.", nodeId);processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);}// we disconnected, so we should probably refresh our metadataif (!nodeIds.isEmpty())metadataUpdater.requestUpdate();}
获取所有有超时请求的节点id,这里判断是否超时的标准是:当前时间-请求的创建时间 > 请求的超时时间(默认30秒)
List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
关闭客户端和有超时请求的节点的连接:
this.selector.close(nodeId);
执行processDisconnection方法:
private void processDisconnection(List<ClientResponse> responses,String nodeId,long now,ChannelState disconnectState) {//将这个节点的连接状态修改为DISCONNECTEDconnectionStates.disconnected(nodeId, now);apiVersions.remove(nodeId);nodesNeedingApiVersionsFetch.remove(nodeId);//根据传入都参数,状态是LOCAL_CLOSE,走default,breakswitch (disconnectState.state()) {case AUTHENTICATION_FAILED:AuthenticationException exception = disconnectState.exception();connectionStates.authenticationFailed(nodeId, now, exception);metadataUpdater.handleFatalException(exception);log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,disconnectState.remoteAddress(), exception.getMessage());break;case AUTHENTICATE:log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +"due to any of the following reasons: (1) Authentication failed due to invalid " +"credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +"traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",nodeId, disconnectState.remoteAddress());break;case NOT_CONNECTED:log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());break;default:break; // Disconnections in other states are logged at debug level in Selector}//清空inFlightRequests中该节点所有批次for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",request.header.apiKey(), request.request, request.header.correlationId(), nodeId);//isInternalRequest默认为falseif (!request.isInternalRequest)//构建一个没有响应体的响应并添加到responses集合中responses.add(request.disconnected(now, disconnectState.exception()));else if (request.header.apiKey() == ApiKeys.METADATA)metadataUpdater.handleDisconnection(request.destination);}}
该方法的逻辑是:
将关闭连接的节点的状态改为DISCONNECTED
清空inFlightRequests中该节点对应的所有inFlightRequest请求
每个inFlightRequest请求构建一个没有响应体的ClientResponse对象,并放入responses集合
更新元数据:
metadataUpdater.requestUpdate();
至此,不管是服务的返回响应的请求,还是超时的请求,都封装了一个ClientResponse对象,并保存到了responses集合中。
1.5 在NetworkClient.poll方法中,通过调用completeResponses方法处理响应(包括服务端返回的响应和发送超时的响应)
completeResponses(responses)
private void completeResponses(List<ClientResponse> responses) {//遍历响应对象for (ClientResponse response : responses) {try {//对响应进行处理response.onComplete();} catch (Exception e) {log.error("Uncaught error in request completion:", e);}}}
public void onComplete() {//如果绑定了回调函数if (callback != null)//调用回调函数的onComplete方法对响应进行处理callback.onComplete(this);}
对于正常返回的响应,是调用了InFlightRequest.completed方法
对于超时的响应,是调用了InFlightRequest.disconnected方法
这两个方法调用的共同点是并没有传入callback参数,所以用的就是InFlightRequest的callback属性。这样来看,这个callback回调函数是构建InFlightRequest对象时给定的,而这个对象是在构建发送消息的请求时创建的。所以找到创建InFlightRequest对象的方法,在NetWorkClient.doSend,如下:
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {...//封装一个inFlightRequest请求InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);...}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {...//构建请求的回调函数RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};String nodeId = Integer.toString(destination);//TODO 构建发送数据的请求:ClientRequestClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,requestTimeoutMs, callback);}
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {//获取响应的请求头RequestHeader requestHeader = response.requestHeader();long receivedTimeMs = response.receivedTimeMs();int correlationId = requestHeader.correlationId();//特殊情况,真正要发送请求了,但是broker失去连接了//超时的响应走这里if (response.wasDisconnected()) {log.trace("Cancelled request with header {} due to node {} being disconnected",requestHeader, response.destination());for (ProducerBatch batch : batches.values())completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);//如果是版本不匹配导致无法发送请求的响应} else if (response.versionMismatch() != null) {log.warn("Cancelled request {} due to a version mismatch with node {}",response, response.destination(), response.versionMismatch());for (ProducerBatch batch : batches.values())completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);//正常的响应走这里} else {log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);// if we have a response, parse it//如果有响应,解析,正常情况走的都是这个分支if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();//遍历每个分区的响应,因为不同分区的leader副本可能在同一个节点,那么发送请求时就有多个分区的批次,这里获取每个分区对应的响应for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();//获取每个分区的响应ProduceResponse.PartitionResponse partResp = entry.getValue();//获取每个分区的批次对象ProducerBatch batch = batches.get(tp);//处理对应分区的响应completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} else {//如果ack=0,即不需要返回响应,即客户端发送完就不管了,不管有没有响应for (ProducerBatch batch : batches.values()) {completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);}}}}
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,long now, long throttleUntilTimeMs) {//获取响应中的error对象Errors error = response.error;//消息过大异常if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {...//如果有其它异常} else if (error != Errors.NONE) {//如果可以重试if (canRetry(batch, response, now)) {log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",correlationId,batch.topicPartition,this.retries - batch.attempts() - 1,error);if (transactionManager == null) {//重新把发送失败的批次加入到队列中reenqueueBatch(batch, now);} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",batch.topicPartition, batch.producerId(), batch.baseSequence());reenqueueBatch(batch, now);} else {failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +"batch but the producer id changed from " + batch.producerId() + " to " +transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);}} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {completeBatch(batch, response);//如果无法重试:1。不允许重试;2。重试次数超了} else {//构建一个RuntimeException实例final RuntimeException exception;//如果响应中带有Topic没有权限的异常if (error == Errors.TOPIC_AUTHORIZATION_FAILED)exception = new TopicAuthorizationException(batch.topicPartition.topic());//如果响应中带有Cluster没有权限的异常else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");elseexception = error.exception();//调用回调函数处理响应,标记批次的状态并释放内存failBatch(batch, response, exception, batch.attempts() < this.retries);}//如果是元数据无效的异常if (error.exception() instanceof InvalidMetadataException) {//未知topic或者partition异常if (error.exception() instanceof UnknownTopicOrPartitionException) {log.warn("Received unknown topic or partition error in produce request on partition {}. The " +"topic-partition may not exist or the user may not have Describe access to it",batch.topicPartition);} else {log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +"to request metadata update now", batch.topicPartition, error.exception().toString());}//更新元数据metadata.requestUpdate();}//如果没有异常} else {completeBatch(batch, response);}if (guaranteeMessageOrder)this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);}
如果是消息过大的异常,则对大的消息进行切分,放回缓存中,然后移除inFlightRequests中对应的请求并释放内存
如果是可重试的异常,则进行重试。更新已经重试的次数,将批次放回缓存中对应的Deque中,然后移除inFlightRequests中对应的请求
如果无法重试,则根据不同的异常类型封装RuntimeException对象,然后调用failBach方法。无法重试有两种情况:
不允许重试
超过了重试的次数
如果是元数据无效的异常,则更新元数据。
如果没有异常,则执行completeBatch方法
private void failBatch(ProducerBatch batch,long baseOffset,long logAppendTime,RuntimeException exception,boolean adjustSequenceNumbers) {if (transactionManager != null) {transactionManager.handleFailedBatch(batch, exception, adjustSequenceNumbers);}this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);//如果这个批次还未标记状态,则标记状态(失败或者成功)if (batch.done(baseOffset, logAppendTime, exception)) {//归还内存池的内存并从inFlightBatches集合中移除maybeRemoveAndDeallocateBatch(batch);}}
这里的batch.done是给批次标记一个状态,由于这里exception不为null,所以标记为FAILED,即这个批次失败了。然后遍历批次中的消息,执行我们生产消息时绑定的那个回调函数在有异常情况下的逻辑。
最后将该批次对应的从inFlightBatches中移除,并释放批次占用的内存。
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {if (transactionManager != null) {transactionManager.handleCompletedBatch(batch, response);}//标记批次的状态:成功if (batch.done(response.baseOffset, response.logAppendTime, null)) {//移除批次并释放内存maybeRemoveAndDeallocateBatch(batch);}}
最后将该批次对应的从inFlightBatches中移除,并释放批次占用的内存。
总结:
客户端通过注册的OP_READ事件,不断读取服务端返回的响应,将读取到的响应封装成NetworkReceive对象并放入stagedReceive结构 从stagedReceive结构中取出NetworkReceive对象,放入completedReceives集合中 从completedReceives集合中取出NetworkReceive对象,进行解析,并封装成ClientResongse对象,放入responses集合 对于超时的请求,首先断开和目标节点的网络连接,标记该节点为DISCONNECTED状态,然后封装一个没有响应体的ClientResponse对象,放入responses集合 不管是收到响应的请求还是超时的请求,最后都从InFlightRequests结构中将该请求移除 遍历responses中的响应对象,执行其回调方法 回调方法是在构建请求的时候绑定的,针对不同的结果执行不同的逻辑: 对于可重试的异常,进行重试,将该批次放回缓存队列进行重新发送 对于不可重试的异常,封装一个RuntimeException异常对象,然后释放内存,标记该批次为FAILED 对于元数据无效的异常,则重新更新元数据 对于消息过大的异常,则进行批次切分,重新放回缓存队列 对于正常的响应,则直接释放掉内存,标记该批次为SUCCEEDED 对于已经完成状态标记的批次,将该批次从InFlightBatches中移除,然后遍历批次中的消息,执行生产消息时绑定的回调函数




