暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty 源码分析 —— NIO 基础(一)之IO模型超详细讲解

努力努力再努力xLg 2020-02-10
165

我准备战斗到最后,不是因为我勇敢,是我想见证一切。--双雪涛《猎人》

[TOC] Thinking

  1. 一个技术,为什么要用它,解决了那些问题?

  2. 如果不用会怎么样,有没有其它的解决方法?

  3. 对比其它的解决方案,为什么最终选择了这种,都有何利弊?

  4. 你觉得项目中还有那些地方可以用到,如果用了会带来那些问题?

  5. 这些问题你又如何去解决的呢?

本文基于Netty 4.1.45.Final-SNAPSHOT

首先推荐阅读 Java I/O模型从BIO到NIO和Reactor模式了解Java中各种I/O模型,同步与异步、阻塞与非阻塞的区分。

简单介绍一下Unix下的五种I/O模型:


    • 阻塞 I/O

  • 非阻塞 I/O

  • I/O 多路复用(select和poll)

  • 信号驱动 I/O(SIGIO)

  • 异步 I/O(Posix.1的aio_系列函数)

阻塞I/O

所谓阻塞I/O,就是在该I/O下请求无法立即完成则保持阻塞。其它操作需要等待。即为阻塞。

  • 阶段1:等待数据就绪。

  • 网络I/O的情况就是等待远程数据陆续抵达;

  • 磁盘I/O的情况就是等待磁盘数据从磁盘上读取到内核态内存中。

  • 阶段2:数据拷贝。

  • 出入系统安全,用户态的程序没有权限直接读取内核态内存,因此内核态负责把内核态内存中的数据拷贝一份到用户态内存中。

通俗的讲:比如read和write,通常IO操作都是阻塞I/O的,也就是说当你调用read时,如果没有数据收到,那么线程或者进程就会被挂起,直到收到数据。阻塞的意思,就是一直等着。阻塞I/O就是等着数据过来,进行读写操作。应用的函数进行调用,但是内核一直没有返回,就一直等着。应用的函数长时间处于等待结果的状态,我们就称为阻塞I/O。每个应用都得等着,每个应用都在等着,浪费啊!很像现实中的情况。大家都不干活,等着数据过来,过来工作一下,没有的话继续等着。

Bio代码示例

BIO示例

Server

  1. /**

  2. * BIO 服务端

  3. *

  4. * @author by Mr. Li

  5. * @version 1.0

  6. * @date 2020/2/5 20:36

  7. */

  8. @Slf4j

  9. public final class Server{

  10. // 默认的端口号

  11. private static int DEFAULT_PORT = 9999;

  12. // 实例化serverSocket

  13. private static ServerSocket server;


  14. private static Socket socket = null;


  15. // 根据默认端口号 启动服务器

  16. public static void start() {

  17. try {

  18. start(DEFAULT_PORT);

  19. } catch (Exception e) {

  20. log.error("服务器异常:{}", e.getMessage());

  21. }

  22. }


  23. private static void start(int port) throws IOException {


  24. try {

  25. server = new ServerSocket(port);

  26. log.info("服务端启动成功,端口号为:{}", port);


  27. // 使用accept 阻塞

  28. while (true) {

  29. socket = server.accept();

  30. new Thread(new Runnable() {

  31. @Override

  32. public void run() {

  33. BufferedReader br = null;

  34. PrintWriter pw = null;

  35. try {

  36. br = new BufferedReader(new InputStreamReader(socket.getInputStream()));

  37. pw = new PrintWriter(socket.getOutputStream(), true);

  38. String expression;

  39. String result;


  40. while (true) {

  41. if ((expression = br.readLine()) == null) {

  42. break;

  43. }

  44. log.info("服务器接收到的信息:{}", expression);

  45. result = JSON.toJSONString(expression);

  46. log.info("result :{}", result);

  47. pw.println("Client IP:" + socket.getLocalAddress() + ",Port:" + socket.getPort() + ", Message : " + result);


  48. }

  49. } catch (Exception e) {

  50. log.error("服务器出现异常,{}", e.getCause().getMessage());

  51. } finally {

  52. if (br != null) {

  53. try {

  54. br.close();

  55. } catch (IOException e) {

  56. e.printStackTrace();

  57. }

  58. br = null;

  59. }

  60. if (pw != null) {

  61. pw.close();

  62. pw = null;

  63. }

  64. if (socket != null) {

  65. try {

  66. socket.close();

  67. } catch (IOException e) {

  68. e.printStackTrace();

  69. }

  70. socket = null;

  71. }

  72. }

  73. }

  74. }).start();

  75. }

  76. } catch (Exception e) {

  77. log.error("服务器出现异常,{}", e.getCause().getMessage());

  78. } finally {

  79. if (server != null) {

  80. log.info("服务器已关闭");

  81. server.close();

  82. server = null;

  83. }

  84. }

  85. }

  86. }

Client

  1. /**

  2. * BIO 客户端

  3. *

  4. * @author by Mr. Li

  5. * @date 2020/2/5 21:54

  6. */

  7. @Slf4j

  8. public class Client {

  9. // 默认的端口号

  10. private static int DEFAULT_PORT = 9999;

  11. private static String DEFAULT_SERVER_IP = "127.0.0.1";


  12. public static void send(String expression) {

  13. send(DEFAULT_PORT, DEFAULT_SERVER_IP, expression);

  14. }


  15. private static void send(int defaultPort, String defaultServerIp, String expression) {

  16. log.info("数据接受成功:{}", expression);

  17. Socket socket = null;

  18. BufferedReader br = null;

  19. PrintWriter pw = null;


  20. try {

  21. socket = new Socket(defaultServerIp, defaultPort);

  22. // 获取 输出流

  23. br = new BufferedReader(new InputStreamReader(socket.getInputStream()));

  24. // 获取 输入流

  25. pw = new PrintWriter(socket.getOutputStream(), true);

  26. pw.println(expression);

  27. log.info("Client Result : {}", br.readLine());

  28. } catch (Exception e) {

  29. log.error("Client Exception :{}", e.getCause().getMessage());

  30. } finally {

  31. if (br != null) {

  32. try {

  33. br.close();

  34. } catch (IOException e) {

  35. e.printStackTrace();

  36. }

  37. br = null;

  38. }

  39. if (pw != null) {

  40. pw.close();

  41. pw = null;

  42. }

  43. if (socket != null) {

  44. try {

  45. socket.close();

  46. } catch (IOException e) {

  47. e.printStackTrace();

  48. }

  49. socket = null;

  50. }

  51. }

  52. }

  53. }

测试主函数

  1. /**

  2. * BIO 代码实现

  3. * 阻塞IO 的代码实现

  4. * <p>

  5. * 测试代码

  6. *

  7. * @author by Mr. Li

  8. * @date 2020/2/5 20:35

  9. */

  10. @Slf4j

  11. public class ServerMain {

  12. public static void main(String[] args) {

  13. // 运行服务器

  14. new Thread(() -> {

  15. Server.start();

  16. }).start();

  17. }

  18. }


  19. /**

  20. * BIO 客户端启动

  21. *

  22. * @author by Mr. Li

  23. * @date 2020/2/6 11:50

  24. */

  25. @Slf4j

  26. public class ClientMain {

  27. public static void main(String[] args) {

  28. while (true) {


  29. StringBuilder sb = new StringBuilder();

  30. // 键盘录入

  31. Scanner scanner = new Scanner(System.in);

  32. log.info("Login UserName <br/>");

  33. String userName = scanner.nextLine();

  34. sb.append("UserName :").append(userName + ";");

  35. log.info("What Are U Doing ? <br/>");

  36. String doing = scanner.nextLine();

  37. sb.append("Doing :").append(doing);


  38. // 运行客户端

  39. new Thread(() -> Client.send(sb.toString())).start();

  40. }

  41. }

  42. }

可以看出BIO的问题就是,当客户端向服务端发送数据时,服务端只能等待,相反客户端在等待服务端发送数据时也是如此。性能非常的低下。

BIO 模型

针对BIO的通讯模型提出的问题

一请求一应答模式,为每个请求创建一个线程,同一时间只能处理一个请求,等待I/O的过程浪费大量CPU的资源。同时无法充分使用多CPU的优势。

针对这种问题,即引出了伪异步I/O模型。用于解决一请求一应答的模式,使用线程池装载所有的请求,然后使用新的线程一一处理请求。避免了传统BIO为每一个请求创建线程造成大量的资源浪费的问题。

伪异步I/O

server

  1. /**

  2. * Fake BIO Client

  3. * 使用线程池管理

  4. *

  5. * @author by Mr. Li

  6. * @date 2020/2/6 14:18

  7. */

  8. @Slf4j

  9. public class FakeExecutorServer {

  10. public static void main(String[] args) {

  11. // 定义线程池

  12. ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

  13. ServerSocket serverSocket = null;


  14. try {

  15. serverSocket = new ServerSocket(Configuration.DEFAULT_PORT);

  16. log.info("Client connection from :{}", serverSocket.getInetAddress());

  17. while (true) {

  18. Socket accept = serverSocket.accept();

  19. // 使用线程池

  20. executor.submit(() -> {

  21. Configuration.getAccept(accept);

  22. });

  23. }


  24. } catch (Exception e) {

  25. log.info("FAKE Executor Servers Exception : {}", e.getMessage());

  26. }

  27. }

  28. }

  29. public static void getAccept(Socket accept) {

  30. BufferedReader in = null;

  31. try {

  32. in = new BufferedReader(new InputStreamReader(accept.getInputStream()));

  33. // 读取 网络流 数据,使用commons-io 中的io工具类

  34. List<String> strings = IOUtils.readLines(in);

  35. String message = strings.stream().collect(Collectors.joining());

  36. // String message = IOUtils.toString(in);

  37. log.info("From IP : " + accept.getInetAddress() + ",Port :" + accept.getPort() + ",Message :" + message);

  38. } catch (Exception e) {

  39. log.info("Fake BIO Server Is Exception : {}", e.getMessage());

  40. } finally {

  41. IOUtils.closeQuietly(in);

  42. IOUtils.closeQuietly(accept);

  43. }

  44. }

伪异步I/O模型

 当有新的客户端接入时,将客户端的Socket封装成一个Task(该任务实现Java.lang.Runnablle接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。由于线程池和消息队列都是有界的,因此,无论客户端并发连接数多大,它都不会导致线程个数过于膨胀或者内存溢出,相对于传统的一连接一线程模型,是一种改良。伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。但是由于它底层的通信依然采用同步阻塞模型,因此无法从根本上解决问题。通过对输入和输出流的API文档进行分析,我们了解到读和写操作都是同步阻塞的,阻塞的时间取决于对方IO线程的处理速度和网络IO的传输速度,本质上讲,我们无法保证生产环境的网络状况和对端的应用程序能足够快,如果我们的应用程序依赖对方的处理速度,它的可靠性就会非常差。

伪异步I/O原则上还是同步阻塞的,因为数据读取和返回都是阻塞的,都需要等待并且同步。所以这里有个很重要的概念就是:同步与异步的区别,简单的说就是同步:正在运行的线程需要等待结果伪同步。异步:正在运行的线程不需要等待结果,可以接着做其他事情为异步。

在Java IO 中,在JDK1.4之前java IO 一直都是Java大家庭中的一个短板,同步且阻塞的模型,大大的降低了性能。所以在JDK 1.4之后引入了一个重要的概念 NIO
(New IO Non Block IO)

在介绍NIO之前,我们首先来了解一下 Reactor
模式,因为NIO就是基于这种模型来实现非阻塞I/O(但是在本质上NIO 还是同步的,并不是异步的)

这里好好的解释一下 NIO 为什么是同步非阻塞的

  • Nio底层是epoll,在该定义中,该底层就是同步非阻塞的

  • 同步和异步的区别就在于:

  • 同步在整个I/O操作中导致进程阻塞,而异步不会。

  • 异步非阻塞IO(AIO)属于以下情况:

  • 当发送一个I/O请求,操作系统会帮忙做好所有的事情,包括获取I/O流,以及将I/O流从内核拷贝到用户空间,用户只需要接收到完成的通知即可。

  • 而NIO属于多路复用I/O,

  • Nio虽然在获取内核的数据时,并不会阻塞,但是它却在将内核数据拷贝到用户空间的时候线程阻塞,所以NIO是同步非阻塞的。

多路复用I/O--NIO

Reactor模式

 在Reactor中,把大粒度线程拆分,这些被拆分的小线程或者子过程对应的是handler,每一种handler会出处理一种event。这里会有一个全局的管理者selector,我们需要把channel注册感兴趣的事件,那么这个selector就会不断在channel上检测是否有该类型的事件发生,如果没有,那么主线程就会被阻塞,否则就会调用相应的事件处理函数即handler来处理。典型的事件有连接,读取和写入,当然我们就需要为这些事件分别提供处理器,每一个处理器可以采用线程的方式实现。一个连接来了,显示被读取线程或者handler处理了,然后再执行写入,那么之前的读取就可以被后面的请求复用,吞吐量就提高了。

可以简单理解为,基于事件来完成,底层原理:epoll Selector 底层初始化时候回申请一个 128 长度的事件数组 当 epoll 通知事件时候 jvm native 层会直接往这事件数组里插事件 你要做的事就是把事件拿出来,然后进行相应的处理。

最简单经典的Reactor模式

在这种经典的Reactor模式中包含的角色如下:

  • Reactor:将I/O事件派发给对应的Handler。

  • Acceptor:处理客户端连接请求

  • Handlers:执行非阻塞读/写,标识系统管理的资源;同时将handler与事件绑定。

代码实现

  1. /**

  2. * NIO 简单的Reactor模型研究

  3. * <p>

  4. * 单线程的 Reactor 模型

  5. *

  6. * @author by Mr. Li

  7. * @version 1.0

  8. * @date 2020/2/6 20:36

  9. */

  10. @Slf4j

  11. public class ReactorServer {

  12. public static void main(String[] args) throws Exception {

  13. // 获取Selector选择器

  14. Selector selector = Selector.open();

  15. // 获取Channel 通道

  16. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

  17. // 开启 非阻塞

  18. serverSocketChannel.configureBlocking(false);

  19. serverSocketChannel.bind(new InetSocketAddress(Configuration.DEFAULT_PORT));

  20. // 在通道上注册 选择器,并且注册事件

  21. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


  22. while (selector.select() > 0) {

  23. Set<SelectionKey> keys = selector.selectedKeys();

  24. Iterator<SelectionKey> iterator = keys.iterator();

  25. while (iterator.hasNext()) {

  26. SelectionKey next = iterator.next();

  27. iterator.remove();

  28. if (next.isAcceptable()) {

  29. ServerSocketChannel channel = (ServerSocketChannel) next.channel();

  30. // 监听

  31. SocketChannel accept = channel.accept();

  32. accept.configureBlocking(false);

  33. log.info("Accept request from {}", accept.getRemoteAddress());

  34. accept.register(selector, SelectionKey.OP_READ);

  35. } else if (next.isReadable()) {

  36. SocketChannel channel = (SocketChannel) next.channel();

  37. // 使用Buffer 读取数据

  38. ByteBuffer buffer = ByteBuffer.allocate(1024);

  39. int count = channel.read(buffer);

  40. if (count <= 0) {

  41. channel.close();

  42. next.cancel();

  43. log.info("After sending data, close the connection");

  44. continue;

  45. }

  46. log.info("Received message {}", new String(buffer.array()));

  47. }

  48. keys.remove(next);

  49. }

  50. }

  51. }

  52. }

从上面的代码 21、35行代码可以看出,多个 Channel
可以注册到同一个 Selector
中。实现了同一个线程同时监控多个请求状态(Channel中绑定事件),同时注册时需要指定它所关注的事件。

#selector.select()
是阻塞的,当有至少一个通道可用时,该方法返回可用通道个数。同时该方法只捕获Channel注册时指定的所关注的事件。

多工作线程Reactor模式

在上述经典Reactor模式中,尽管一个线程可同时监控多个请求 Channel
,但是所有的I/O读写操作,以及新的连接请求都是在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。

针对这种情况,思考到 BIO
中的一请求一应答的模式的优化采用的伪异步操作,使用线程池,并行处理多个读/写操作,如下图:

  • Reactor:将I/O事件派发给对应的Handler。

  • Acceptor:处理客户端连接请求

  • Handlers:执行非阻塞读/写,标识系统管理的资源;同时将handler与事件绑定。

修改后的代码如下:

  1. /**

  2. * @author by Mr. Li

  3. * @date 2020/2/6 21:18

  4. */

  5. @Slf4j

  6. public class ReactorExecutorServer {

  7. public static void main(String[] args) throws Exception {

  8. // 开启连接

  9. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

  10. serverSocketChannel.bind(new InetSocketAddress(Configuration.DEFAULT_PORT));

  11. // 获取 Selector对象

  12. Selector selector = Selector.open();

  13. // 开启 非阻塞

  14. serverSocketChannel.configureBlocking(false);

  15. // 注册选择器

  16. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


  17. while (true) {

  18. if (selector.selectNow() < 0) {

  19. continue;

  20. }

  21. Set<SelectionKey> selectionKeys = selector.selectedKeys();

  22. Iterator<SelectionKey> iterator = selectionKeys.iterator();

  23. while (iterator.hasNext()) {

  24. SelectionKey key = iterator.next();

  25. iterator.remove();

  26. // 对应上面的 监听事件

  27. if (key.isAcceptable()) {

  28. ServerSocketChannel acceptChannel = (ServerSocketChannel) key.channel();

  29. SocketChannel accept = acceptChannel.accept();

  30. accept.configureBlocking(false);

  31. log.info("Accept request from {}",accept.getRemoteAddress());

  32. // 注册 读取事件

  33. SelectionKey redKey = accept.register(selector, SelectionKey.OP_READ);

  34. // 在selector 对象上附加一个对象,稍后可以通过{@link #attachment()* Attachment}方法来检索附加的对象。

  35. redKey.attach(new Processor());

  36. }else if (key.isReadable()){

  37. Processor processor = (Processor)key.attachment();

  38. processor.process(key);

  39. }

  40. }

  41. }

  42. }

  43. }

在SocketChannel对象上注册OP_READ事件后,得到的SelectorKey对象绑定一个对象,(在上述例子中,该对象用于处理请求),并且在获取到可读事件后,可以取出这个对象。

该对象,维护一个线程池,并不直接处理IO操作。

#Processor()

  1. /**

  2. * 使用线程池

  3. *

  4. * @author by Mr. Li

  5. * @date 2020/2/6 21:29

  6. */

  7. @Slf4j

  8. public class Processor {


  9. private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(16);


  10. public void process(SelectionKey key) {

  11. EXECUTOR_SERVICE.submit(() -> {

  12. // 使用 buffer 缓存

  13. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

  14. SocketChannel socketChannel = (SocketChannel) key.channel();

  15. int count = socketChannel.read(byteBuffer);

  16. if (count < 0) {

  17. socketChannel.close();

  18. key.cancel();

  19. log.info("{} Read ended", socketChannel);

  20. return null;

  21. } else if (count == 0) {

  22. return null;

  23. }

  24. log.info("{} Read message {}", socketChannel, new String(byteBuffer.array()));

  25. return null;

  26. });

  27. }

  28. }

上述代码,就是应用了线程池,并发的处理I/O操作,使I/O操作和新请求不阻塞。

但是根据上图看出此模式还是存在一个问题,当新的请求(Channel)和I/O操作变多时,上述的模型中 Reactor
就会显得力不从心。因为整个模型中,它始终单独的一个。

多Reactor

Netty中使用的 Reactor
模式,引入了多 Reactor
,即:

一个主 Reactor
负责监控所有的连接请求,多个子Reactor负责监控并处理I/O请求,减轻了主Reactor的压力,降低了主 Reactor
压力太大造成的延迟。

并且在Netty中,每个子 Reactor
分别属于独立的线程,每个成功连接后的 Channel
的所有操作由同一个线程处理。

这样充分的保证了同一请求的所有状态和上下文都在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求相应状态。

如图所示:

  • Reactor:将I/O事件派发给对应的Handler。

  • Acceptor:处理客户端连接请求

  • Handlers:执行非阻塞读/写,标识系统管理的资源;同时将handler与事件绑定。

代码如下:

  1. /**

  2. * Netty 提供的 多Reactor 模型

  3. *

  4. * @author by Mr. Li

  5. * @date 2020/2/6 21:56

  6. */

  7. @Slf4j

  8. public class MultiReactorExecutorServer {



  9. public static void main(String[] args) throws Exception {

  10. Selector selector = Selector.open();

  11. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

  12. serverSocketChannel.bind(new InetSocketAddress(Configuration.DEFAULT_PORT));

  13. serverSocketChannel.configureBlocking(false);

  14. // 注册事件

  15. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


  16. // 获取当前系统cpu个数

  17. int count = Runtime.getRuntime().availableProcessors();

  18. Processor[] processors = new Processor[2 * count];

  19. // 创建对象数组,如果不赋值,对象会全部为null,会报空指针异常

  20. for (int i = 0; i < processors.length; i++) {

  21. processors[i] = new Processor();

  22. }

  23. int index = 0;

  24. while (selector.select() > 0) {

  25. Set<SelectionKey> selectionKeys = selector.selectedKeys();

  26. for (SelectionKey selectionKey : selectionKeys) {

  27. selectionKeys.remove(selectionKey);

  28. if (selectionKey.isAcceptable()) {

  29. ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();

  30. SocketChannel socketChannel = serverSocketChannel1.accept();

  31. socketChannel.configureBlocking(false);

  32. log.info("Accept request from {}", socketChannel.getRemoteAddress());

  33. // round robin 轮循

  34. Processor processor = processors[(int) (index++ % count)];

  35. processor.addChannel(socketChannel);

  36. processor.wakeup();

  37. }

  38. }

  39. }

  40. }

  41. }

当主Reactor 监控到成功连接的Channel后,会通过轮询的方式交给不同的子Reactor。

  1. /**

  2. * 多Reactor

  3. 子Reactor 用于处理IO 操作

  4. *

  5. * @author by Mr. Li

  6. * @date 2020/2/6 21:29

  7. */

  8. @Slf4j

  9. public class Processor {


  10. private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);


  11. public void process(SelectionKey key) {

  12. // 子Reactor 负责I/O

  13. EXECUTOR_SERVICE.submit(() -> {

  14. // 使用 buffer 缓存

  15. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

  16. SocketChannel socketChannel = (SocketChannel) key.channel();

  17. int count = socketChannel.read(byteBuffer);

  18. if (count < 0) {

  19. socketChannel.close();

  20. key.cancel();

  21. log.info("{} Read ended", socketChannel);

  22. return null;

  23. } else if (count == 0) {

  24. return null;

  25. }

  26. log.info("{} Read message {}", socketChannel, new String(byteBuffer.array()));

  27. return null;

  28. });

  29. }


  30. private Selector selector;


  31. /**

  32. * 为每一个Reactor 创建单独的 selector 实现单线程管理,减少线程之间切换的资源消耗

  33. * 并且 没创建一个新的对象,都会向线程池中提交一个任务。

  34. * @throws IOException

  35. */

  36. public Processor() throws IOException {

  37. this.selector = SelectorProvider.provider().openSelector();

  38. start();

  39. }


  40. private void start() {

  41. EXECUTOR_SERVICE.submit(() -> {

  42. while (true) {

  43. if (selector.select(500) <= 0) {

  44. continue;

  45. }

  46. Set<SelectionKey> selectionKeys = selector.selectedKeys();

  47. Iterator<SelectionKey> iterator = selectionKeys.iterator();

  48. while (iterator.hasNext()) {

  49. SelectionKey key = iterator.next();

  50. selectionKeys.remove(key);

  51. if (key.isReadable()) {

  52. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

  53. SocketChannel socketChannel = (SocketChannel) key.channel();

  54. int count = socketChannel.read(byteBuffer);

  55. if (count < 0) {

  56. socketChannel.close();

  57. key.cancel();

  58. log.info("Read Ended :{}", socketChannel);

  59. continue;

  60. } else if (count == 0) {

  61. log.info("{} Message size is 0", socketChannel);

  62. continue;

  63. } else {

  64. log.info("{} Read message {}", socketChannel, new String(byteBuffer.array()));

  65. }

  66. }

  67. }

  68. }

  69. });

  70. }


  71. public void addChannel(@NotNull SocketChannel socketChannel) throws ClosedChannelException {

  72. socketChannel.register(this.selector, SelectionKey.OP_READ);

  73. }


  74. public void wakeup() {

  75. selector.wakeup();

  76. }

  77. }

在Processor中,创建了静态的线程池,为机器核数的两倍。

每个Processor实例均包含一个Selector实例。同时每次获取Processor实例时均会提交一个任务到该线程池中,并且该任务正常情况下会一直循环下去,而提交给该Processor的SocketChannel通过在其Selector注册事件,加入到相应的任务中。由此实现了每个子Reactor包含一个Selector对象,并由独立的线程处理。


我们再回过头来看, UnixIO
中的五种IO模型。

NIO其实就是属于I/O多路复用性,是属于同步非阻塞I/O

I/O多路复用

I/O多路复用会用到select或者poll函数,这两个函数也会使线程阻塞,但是和阻塞I/O所不同的是,这两个函数可以同时阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。(这里运用了事件,用于监控读写的事件,触发读写操作)

从流程上来看,使用select函数进行I/O请求和同步阻塞模型没有太大的区别,甚至还多了添加监视Channel,以及调用select函数的额外操作,增加了额外工作。但是,使用 select以后最大的优势是用户可以在一个线程内同时处理多个Channel的I/O请求。用户可以注册多个Channel,然后不断地调用select读取被激活的Channel,即可达到在同一个线程内同时处理多个I/O请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

调用select/poll该方法由一个用户态线程负责轮询多个Channel,直到某个阶段1的数据就绪,再通知实际的用户线程执行阶段2的拷贝。通过一个专职的用户态线程执行非阻塞I/O轮询,模拟实现了阶段一的异步化。

AIO

AIO 调用aio_read 函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知的方式,然后立即返回。当内核将数据拷贝到缓冲区后,再通知应用程序。所以异步I/O模式下,阶段1和阶段2全部由内核完成,完成不需要用户线程的参与。

// TODO

AIO 在Netty 5 后被运用,但是存在重大Bug,已被弃用。所以现在Netty 4 底层还是使用的NIO。


NIO 基础

Java NIO( New IO 或者 Non Blocking IO ) ,从 Java 1.4 版本开始引入的非阻塞 IO ,用于替换标准( 有些文章也称为传统,或者 Blocking IO 。下文统称为 BIO ) Java IO API 的 IO API 。

在一些文章中,会将 Java NIO 描述成异步 IO ,实际是不太正确的:Java NIO 是同步 IO ,Java AIO ( 也称为 NIO 2 )是异步 IO。具体原因,推荐阅读文章:

  • 《异步和非阻塞一样吗? (内容涉及 BIO, NIO, AIO, Netty)》 。

  • 《BIO与NIO、AIO的区别(这个容易理解)》

总结来说,在 Unix IO 模型的语境下:

  • 同步和异步的区别:数据拷贝阶段是否需要完全由操作系统处理。

  • 阻塞和非阻塞操作:是针对发起 IO 请求操作后,是否有立刻返回一个标志信息而不让请求线程等待。

  • 从底层的角度来讲,NIO底层应用的epoll/Selector,底层初始化时候申请一个 128 长度的事件数组 当 epoll 通知事件时候 jvm native 层会直接往这事件数组里插事件 你要做的事就是把事件拿出来

  • 同步非阻塞,同步体现在 selector 仍然要去轮循判断 channel 是否准备好,非阻塞体现在这个过程中处理线程不会一直在等待,可以去做其他的事情。

因此,Java NIO 是同步且非阻塞的 IO 。

核心组件

NIO 三大神器:

  • Channel

  • Buffer

  • Selector

NIO/BIO对比

NIOBIO
基于缓冲区(Buffer)基于流(Stream)
非阻塞IO阻塞IO
选择器(Selector)

选择器是NIO的关键,是实现非阻塞的基础。

基于Buffer与基于Stream

BIO是面向字节流或者字符流的,而NIO的出现,就是为了解决这种传统的IO读取中,对流的等待出现的,为了摒弃这种情况,引出了 Channel
和 Buffer
的概念:从 Channel
中读取数据到 Buffer
中,或者将数据从 Buffer
中写到 Channel
中。(从这里可以看出,Buffer 是一个双向的缓冲区。)

阻塞与非阻塞 IO

Java IO 的各种流是阻塞的 IO 操作。这就意味着,当一个线程执行读或写 IO 操作时,该线程会被阻塞,直到有一些数据被读取,或者数据完全写入。


Java NIO 可以让我们非阻塞的使用 IO 操作。例如:

  • 当一个线程执行从 Channel 执行读取 IO 操作时,当此时有数据,则读取数据并返回;当此时无数据,则直接返回而不会阻塞当前线程

  • 当一个线程执行向 Channel 执行写入 IO 操作时,不需要阻塞等待它完全写入,这个线程同时可以做别的事情。

也就是说,线程可以将非阻塞 IO 的空闲时间用于在其他 Channel 上执行 IO 操作。所以,一个单独的线程,可以管理多个 Channel 的读取和写入 IO 操作。

Selector

Java NIO 引入了Selector(选择器)的概念,它是Java NIO 得以实现非阻塞IO的最最最最最关键

在Selector上,可以注册多个 Channel
到一个Selector中,而Selector内部的机制,就可以自动的对该Selector上注册的Channel进行查询(selector)操作,判断这些注册的 Channel
是否已就绪的IO事件(例如可读,可写,网络连接已完成)

通过这样的机制,一个线程通过使用一个Selector,就可以非常简单且高效的来管理多个Channle了。

Netty 为什么抛弃Netty5

具体为什么 Netty 4.1.X 版本不支持 Java AIO 的原因,可参见 《Netty(二):Netty 为啥去掉支持 AIO ?》 文章。

本文仅供笔者本人学习,一起进步!

——努力努力再努力xLg

阻塞I/O、非阻塞I/O和I/O多路复用(对三种I/O进行白话讲解)

Java I/O模型从BIO到NIO和Reactor模式(对Reactor模式 讲解细致)

Java的高性能IO——Reactor模式

Java NIO 到底是异步还是同步,阻塞还是非阻塞?

加油!


文章转载自努力努力再努力xLg,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论