暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Tomcat如何实现异步IO(下)

Alleria Windrunner 2021-08-30
1120
上一篇我们介绍了异步IO以及Java中的NIO2,本篇我们一起来看一下Tomcat中是如何实现异步IO的。

Nio2Endpoint

掌握了 Java NIO.2 API 的使用以及服务端程序的工作原理之后,再来理解 Tomcat 的异步 I/O 实现就不难了。我们先通过一张图来看看 Nio2Endpoint 有哪些组件。


从图上看,总体工作流程跟 NioEndpoint 是相似的。


LimitLatch 是连接控制器,它负责控制最大连接数。


Nio2Acceptor 扩展了 Acceptor,用异步 I/O 的方式来接收连接,跑在一个单独的线程里,也是一个线程组。Nio2Acceptor 接收新的连接后,得到一个 AsynchronousSocketChannel,Nio2Acceptor 把 AsynchronousSocketChannel 封装成一个 Nio2SocketWrapper,并创建一个 SocketProcessor 任务类交给线程池处理,并且 SocketProcessor 持有 Nio2SocketWrapper 对象。


Executor 在执行 SocketProcessor 时,SocketProcessor 的 run 方法会调用 Http11Processor 来处理请求,Http11Processor 会通过 Nio2SocketWrapper 读取和解析请求数据,请求经过容器处理后,再把响应通过 Nio2SocketWrapper 写出。


需要你注意 Nio2Endpoint 跟 NioEndpoint 的一个明显不同点是,Nio2Endpoint 中没有 Poller 组件,也就是没有 Selector。这是为什么呢?因为在异步 I/O 模式下,Selector 的工作交给内核来做了。


接下来我详细介绍一下 Nio2Endpoint 各组件的设计。


Nio2Acceptor

和 NioEndpint 一样,Nio2Endpoint 的基本思路是用 LimitLatch 组件来控制连接数,但是 Nio2Acceptor 的监听连接的过程不是在一个死循环里不断地调 accept 方法,而是通过回调函数来完成的。我们来看看它的连接监听方法:

    serverSock.accept(null, this);


    其实就是调用了 accept 方法,注意它的第二个参数是 this,表明 Nio2Acceptor 自己就是处理连接的回调类,因此 Nio2Acceptor 实现了 CompletionHandler 接口。那么它是如何实现 CompletionHandler 接口的呢?



      protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
      implements CompletionHandler<AsynchronousSocketChannel, Void> {

      @Override
      public void completed(AsynchronousSocketChannel socket,
      Void attachment) {

      if (isRunning() && !isPaused()) {
      if (getMaxConnections() == -1) {
      //如果没有连接限制,继续接收新的连接
      serverSock.accept(null, this);
      } else {
      //如果有连接限制,就在线程池里跑run方法,run方法会检查连接数
      getExecutor().execute(this);
      }
      //处理请求
      if (!setSocketOptions(socket)) {
      closeSocket(socket);
      }
      }
      }

      可以看到 CompletionHandler 的两个模板参数分别是 AsynchronousServerSocketChannel 和 Void,我在前面说过第一个参数就是 accept 方法的返回值,第二个参数是附件类,由用户自己决定,这里为 Void。completed 方法的处理逻辑比较简单:

      • 如果没有连接限制,继续在本线程中调用 accept 方法接收新的连接。
      • 如果有连接限制,就在线程池里跑 run 方法去接收新的连接。那为什么要跑 run 方法呢,因为在 run 方法里会检查连接数,当连接达到最大数时,线程可能会被 LimitLatch 阻塞。为什么要放在线程池里跑呢?这是因为如果放在当前线程里执行,completed 方法可能被阻塞,会导致这个回调方法一直不返回。

      接着 completed 方法会调用 setSocketOptions 方法,在这个方法里,会创建 Nio2SocketWrapper 和 SocketProcessor,并交给线程池处理。


      Nio2SocketWrapper

      Nio2SocketWrapper 的主要作用是封装 Channel,并提供接口给 Http11Processor 读写数据。讲到这里你是不是有个疑问:Http11Processor 是不能阻塞等待数据的,按照异步 I/O 的套路,Http11Processor 在调用 Nio2SocketWrapper 的 read 方法时需要注册回调类,read 调用会立即返回,问题是立即返回后 Http11Processor 还没有读到数据,怎么办呢?这个请求的处理不就失败了吗?

      为了解决这个问题,Http11Processor 是通过 2 次 read 调用来完成数据读取操作的。

      • 第一次 read 调用:连接刚刚建立好后,Acceptor 创建 SocketProcessor 任务类交给线程池去处理,Http11Processor 在处理请求的过程中,会调用 Nio2SocketWrapper 的 read 方法发出第一次读请求,同时注册了回调类 readCompletionHandler,因为数据没读到,Http11Processor 把当前的 Nio2SocketWrapper 标记为数据不完整。接着SocketProcessor 线程被回收,Http11Processor 并没有阻塞等待数据。这里请注意,Http11Processor 维护了一个 Nio2SocketWrapper 列表,也就是维护了连接的状态。
      • 第二次 read 调用:当数据到达后,内核已经把数据拷贝到 Http11Processor 指定的 Buffer 里,同时回调类 readCompletionHandler 被调用,在这个回调处理方法里会重新创建一个新的 SocketProcessor 任务来继续处理这个连接,而这个新的 SocketProcessor 任务类持有原来那个 Nio2SocketWrapper,这一次 Http11Processor 可以通过 Nio2SocketWrapper 读取数据了,因为数据已经到了应用层的 Buffer。

      这个回调类 readCompletionHandler 的源码如下,最关键的一点是,Nio2SocketWrapper 是作为附件类来传递的,这样在回调函数里能拿到所有的上下文。

        this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
        public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
        ...
        //通过附件类SocketWrapper拿到所有的上下文
        Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
        }


        public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
        ...
        }
        }
        文章转载自Alleria Windrunner,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论