暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty分享之EventLoop

逐码 2018-02-10
216

gris 转载请注明原创出处,谢谢!

Netty高性能的原因除了他设计堪称完美的IO模型外,另外一个原因就是他的线程模型。

有关netty线程模型的内容不是本篇文章将要分享的,我只简单的描述下netty线程模型的几个类别,具体的内容可以查询相关的资料,大体上netty的线程模型可以分成以下几种:

  • 单线程模型

所有I/O操作都由一个线程完成,即多路复用、事件分发和处理都是在一个Reactor线程上完成的

  • 多线程模型

一个Acceptor负责接收请求,一个Reactor Thread Pool负责处理I/O操作

  • 主从多线程模型

一个Acceptor负责接收请求,一个Main Reactor Thread Pool负责连接,一个Sub Reactor Thread Pool负责处理I/O操作

在网络连接从开始到结束的这段生命周期里,我们需要处理连接中的事件,而这就需要为这些事件创建并执行相应的任务。而在netty中一个Channel通道从建立起来时就会为他分配一个EventLoop,用来处理该通道上需要执行的事件,并且该EventLoop直到连接断开的整个过程中都不会发生变化。 

一个EventLoop的内部是一个Thread在支撑,Netty线程模型的卓越性能之一取决于对当前执行的Thread的身份的确定,通过调用EventLoop的inEventLoop(Thread thread)方法来确定。就是在执行一个任务时,确定当前线程是否是分配给当前Channel以及Channel的EventLoop的那一个线程。如果当前线程正好就是支撑EventLoop的那个线程,那么提交给EventLoop的任务将会被直接执行,否则EventLoop会把该任务放入一个内部的队列中进行调度,以便稍后在下一次处理事件时执行。用一个简单的图表示如下:

鉴于Netty线程模型的基石是建立在EventLoop上的,我们今天就来详细的了解下EventLoop。 首先从设计上来看,EventLoop采用了一种协同设计,它建立在两个基本的API之上:Concurrent和Channel,也就是并发和网络。并发是因为它采用了线程池来管理大量的任务,并且这些任务可以并发的执行。其继承了EventExecutor接口,而EventExecutor就是一个事件的执行器。另外为了与Channel的事件进行交互,EventLoop继承了EventLoopGroup接口。一个详细的EventLoop类继承层次结构如下:

一个Netty服务端启动时,通常会有两个NioEventLoopGroup:一个boss,一个worker。第一个NioEventLoopGroup正常只需要一个EventLoop,主要负责客户端的连接请求,然后打开一个Channel,交给第二个NioEventLoopGroup中的一个EventLoop来处理这个Channel上的所有读写事件。一个Channel只会被一个EventLoop处理,而一个EventLoop可能会被分配给多个Channel。

我们知道每个EventLoop只要一个Thread来支撑并处理事件,可以在SingleThreadEventExecutor类中找到这个thread:

  1. /**

  2. * 用来执行任务的线程

  3. * 该线程其实就是用来支撑SingleThreadEventLoop的

  4. */

  5. private volatile Thread thread;

通过debug,我们可以发现服务器启动过程中 bootstrap.bind(PORT).sync()
最终会执行到 AbstractBootstrap.doBind0(regFuture,channel,localAddress,promise)
的方法,而doBind0方法如下:

  1. private static void doBind0(

  2.    final ChannelFuture regFuture, final Channel channel,

  3.    final SocketAddress localAddress, final ChannelPromise promise) {


  4.    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up

  5.    // the pipeline in its channelRegistered() implementation.

  6.    channel.eventLoop().execute(new Runnable() {

  7.        @Override

  8.        public void run() {

  9.            if (regFuture.isSuccess()) {

  10.                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

  11.            } else {

  12.                promise.setFailure(regFuture.cause());

  13.            }

  14.        }

  15.    });

  16. }  

最终会执行到EventLoop的execute方法,而execute方法是在Executor接口中定义的,最终会由SingleThreadEventExecutor来执行该execute方法,一个SingleThreadEventExecutor的调用链大致如下:

  1. #1 SingleThreadEventExecutor.execute(Runnable task)

  2.    -> #2 SingleThreadEventExecutor.startThread()

  3.    -> #2 SingleThreadEventExecutor.doStartThread()

  4.    -> #2 SingleThreadEventExecutor.this.run()

  5.        -> #3 NioEventLoop.run() / for(;;)-loop

  6.            -> #4 NioEventLoop.select(boolean oldWakenUp)

  7.            -> #4 SingleThreadEventExecutor.runAllTasks(long timeoutNanos)

从代码中可以知道下面一些结论: 1、SingleThreadEventExecutor中的Thread的初始化在doStartThread这个方法中 2、最后会调用到SingleThreadEventExecutor.this.run()方法,而该run方法是在NioEventLoop中实现的 3、NioEventLoop中的run方法通过一个 for(;;)
循环来处理Channel事件的类

让我们来看一下NioEventLoop中的run方法:

  1. protected void run() {

  2.    for (;;) {

  3.        try {

  4.            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

  5.                case SelectStrategy.CONTINUE:

  6.                    continue;

  7.                case SelectStrategy.SELECT:

  8.                    select(wakenUp.getAndSet(false));

  9.                    if (wakenUp.get()) {

  10.                        selector.wakeup();

  11.                    }

  12.                    // fall through

  13.                default:

  14.            }

  15.            cancelledKeys = 0;

  16.            needsToSelectAgain = false;

  17.            final int ioRatio = this.ioRatio;

  18.            if (ioRatio == 100) {

  19.                try {

  20.                    processSelectedKeys();

  21.                } finally {

  22.                    // Ensure we always run tasks.

  23.                    runAllTasks();

  24.                }

  25.            } else {

  26.                final long ioStartTime = System.nanoTime();

  27.                try {

  28.                    processSelectedKeys();

  29.                } finally {

  30.                    // Ensure we always run tasks.

  31.                    final long ioTime = System.nanoTime() - ioStartTime;

  32.                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

  33.                }

  34.            }

  35.        } catch (Throwable t) {

  36.            handleLoopException(t);

  37.        }

  38.        // Always handle shutdown even if the loop processing threw an exception.

  39.        try {

  40.            if (isShuttingDown()) {

  41.                closeAll();

  42.                if (confirmShutdown()) {

  43.                    return;

  44.                }

  45.            }

  46.        } catch (Throwable t) {

  47.            handleLoopException(t);

  48.        }

  49.    }

  50. }

在这个 for(;;)
循环里面,首先会通过 selectStrategy.calculateStrategy(selectNowSupplier,hasTasks())
判断当前有没有可以执行的任务,没有则继续循环,如果有任务,那就开始处理任务。在处理事件的时候,不仅要处理IO事件,还要处理非I/O事件。其中 ioRatio
变量表示的是:处理I/O事件和非I/O事件的时间占比,默认为50%。 当且仅当 isShuttingDown()
为true的时候,才开始停止循环。

我们可以通过在关键代码里打印一些日志,来查看服务端启动过程中都执行了哪些事情,例如,可以在每个方法的第一句增加一个 System.out.println()
,具体要在哪些方法中加,各位可以自己去尝试,下面我把我增加后服务端启动时打印的日志信息列在下面:

  1. gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main

  2. gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e

  3. gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main

  4. gris-debug-SingleThreadEventExecutor-startThread

  5. gris-debug-SingleThreadEventExecutor-doStartThread

  6. gris-debug-NioEventLoop-run

  7. gris-debug-NioEventLoop-select

  8. gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos

  9. gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$1@795cd85e

  10. gris-debug-SingleThreadEventExecutor-runAllTasksFrom

  11. gris-debug-NioEventLoop-select

  12. gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos

  13. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  14. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  15. gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e

  16. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  17. gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.ServerBootstrap$1$1@5a627d0e

  18. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  19. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  20. gris-debug-SingleThreadEventExecutor-execute,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4

  21. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  22. gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.bootstrap.AbstractBootstrap$2@41dad0a4

  23. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  24. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  25. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  26. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  27. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  28. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  29. gris-debug-SingleThreadEventExecutor-execute,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da

  30. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  31. gris-debug-SingleThreadEventExecutor-addTask,task=io.netty.channel.AbstractChannel$AbstractUnsafe$2@5da6b8da

  32. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  33. gris-debug-SingleThreadEventExecutor-inEventLoop=false,currentThread=main

  34. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  35. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  36. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  37. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  38. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  39. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  40. gris-debug-SingleThreadEventExecutor-inEventLoop=true,currentThread=nioEventLoopGroup-2-1

  41. gris-debug-SingleThreadEventExecutor-task poll from taskQueue is null,will break

  42. gris-debug-SingleThreadEventExecutor-runAllTasksFrom

  43. gris-debug-NioEventLoop-select

  44. gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos

  45. gris-debug-SingleThreadEventExecutor-runAllTasksFrom

  46. gris-debug-NioEventLoop-select

  47. gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos

  48. gris-debug-SingleThreadEventExecutor-runAllTasksFrom

  49. gris-debug-NioEventLoop-select

  50. gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos

  51. gris-debug-SingleThreadEventExecutor-runAllTasksFrom

  52. gris-debug-NioEventLoop-select

  53. gris-debug-SingleThreadEventExecutor-runAllTasks with timeoutNanos

  54. gris-debug-SingleThreadEventExecutor-runAllTasksFrom

可以看到每一步具体执行所在的类和方法,可以发现最终程序停在 NioEventLoop.run()
方法中,循环着,等待事件的到来,然后进行处理。

我是gris,如果文章对您有帮助,欢迎您点赞加关注,并欢迎您关注我的公众号:


文章转载自逐码,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论