暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Netty核心组件

爱做菜的程序猿 2021-09-26
821

Netty核心组件

Netty核心概念

  • Channel:对应NIO的SocketChannel

  • EventLoop:对应NIO的Selector

  • ChannelPipeline:业务逻辑

  • ChannelHandler:业务逻辑

  • Boostrap:可以选择阻塞或者非阻塞,可以选择有几个selector

  • ByteBuffer:用来存储发送过来的字节

  • Codec: 帮我们做编码解码工作

EventLoop

EventLoop=线程+selector,在nodeJs里面也有用到,EventLoop顾名思义就是不停的轮询。

  • select():原本的select()

  • processSelectedKeys():处理IO事件(最重要的地方)

  • runAllTasks:处理应用层代码

  • task queue:用于存放task,当存放的task和当前线程相符就会拿出来执行。

ps:特别注意,runAllTasks和processSelectedKeys不能用阻塞的代码。

整体模型如下图所示:

EventLoopGroup

为了提高更高的处理效率,引入了EventLoop组(selector的名字就叫多路复用),模型如下图所示:

ChannelHandler&Pipeline  

  • ChannelHandle:进行事件的处理,例如channelRead或channelActive,ChannelHandle被分为了两种类型:

    • 内置头handleCtx:每个流程都得进过这个头结点的handle

    • 内置尾TailCtx:每个流程都得以TailCtx结束

    • ChannellnboundHandler:处理对于程序而言,外部的事件

    • ChannelOutboundHandler:处理对于程序而言,内部的事件

对于每个Handler,netty可以帮我们处理byteBuf或Message,处理好以后按原本的格式输出。

  • ChannelPipeline(流水线):里面包含了很多的Handle,是netty封装好的一个流水线。


运行过程

整体过程:事件->selector->信道channel->流水線pipeline->走handler流程

生命周期

  • channelRegistered:注册事件

  • channelActive:检查信道是否活跃

  • channelInactive:信道不活跃

  • channelUnregistered:取消注册的事件

各组件之间的关系

  • EVentLoopGroup和EventLoop:1对N,没什么讲的,就像线程和线程池一样

  • EventLoop和Thread:1对1,为了保证执行的快,不会引起线程间的通信的问题

  • EventLoop和Channel:1对N,轮询每个信道,这也就是为什么信道不能阻塞的原因,因为一旦阻塞,单线程的EventLoop就会一直等待到当前这个线程执行完毕

  • Channel和ChannelPipeline:1对1,一个Channel只能有一个ChannelPipeline

  • ChannelPipeline和ChannelHandle:1对N的关系,一个ChannelPipeline内有多个ChannelHandle参与流程

  • ChannelHandle和ChannelPipeline:1对N,一个ChannelHandle可以被多个ChannelPipeline调用使用

案例

学了那么多基础知识,写个案例巩固下,写完案例后着手开始源码学习。项目结构如下:

EchoHandler代码如下:

/**
* ChannelInboundHandlerAdapter 主动帮我们去使用下一个handler
* 不用关注于下一个handler的执行
*/
public class EchoHandler extends ChannelInboundHandlerAdapter {
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       String in=(String) msg;
       ctx.channel().writeAndFlush(in);
       ReferenceCountUtil.release(msg);
  }
}

IProtocalHandler代码如下:

public class IProtocalHandler extends ChannelInboundHandlerAdapter {

   @Override
   public void channelRead(ChannelHandlerContext ctx, final Object msg) throws Exception {
       int sleep = 500 * new Random().nextInt(5);
       System.out.println("sleep:" + sleep);
       Thread.sleep(sleep);

       final ByteBuf buf = (ByteBuf) msg;
       char c1 = (char) buf.readByte();
       char c2 = (char) buf.readByte();

       if (c1 != 'J' || c2 != 'W') {
           ctx.fireExceptionCaught(new Exception("magic error"));
           return ;
      }

       buf.readShort();//skip length

       String outputStr = buf.toString(CharsetUtil.UTF_8);
       System.out.println(outputStr);

       ctx.channel().writeAndFlush(outputStr+"\n");

  }
}

PipelinePrintHandler代码如下:

/**
* ChannelInboundHandlerAdapter 主动帮我们去使用下一个handler
* 不用关注于下一个handler的执行
*/
public class PipelinePrintHandler extends ChannelInboundHandlerAdapter {
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       super.channelActive(ctx);
       System.out.println(ctx.pipeline().names());
  }
}

PrintInboundHandler代码如下:

public class PrintInboundHandler implements ChannelInboundHandler {
   private final String id ;

   public PrintInboundHandler(String id) {
       this.id = id;
  }

   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
       System.out.println("handlerAdded " + id);

  }

   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
       System.out.println("handlerRemoved "+ id);

  }

   public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
       System.out.println("channelRegistered "+ id);
       ctx.fireChannelRegistered();

  }

   public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
       System.out.println("channelUnregistered "+ id);
       ctx.fireChannelUnregistered();

  }

   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       System.out.println("channelActive "+ id);
       ctx.fireChannelActive();

  }

   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       System.out.println("channelInactive "+ id);
       ctx.fireChannelInactive();
  }

   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       System.out.println("channelRead "+ id);
       ctx.fireChannelRead(msg);
       //ctx.channel().pipeline().fireChannelRead(msg);
  }

   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
       System.out.println("channelReadComplete "+ id);
       ctx.fireChannelReadComplete();
  }

   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
       System.out.println("userEventTriggered "+ id);

  }

   public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
       System.out.println("channelWritabilityChanged "+ id);

  }

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       System.out.println("exceptionCaught "+ id);

  }

}

Server代码如下:

public class Server {

   private static  void use(ChannelPipeline pipeline, Consumer<ChannelPipeline> strategy){
       strategy.accept(pipeline);
  }
   private static Consumer<ChannelPipeline> echo= p ->{
       p.addLast(
               new LineBasedFrameDecoder(80,false,false),
               new StringDecoder(),
               new EchoHandler(),
               new PipelinePrintHandler(),
               new StringEncoder(StandardCharsets.UTF_8)
      );
  };
   private  static Consumer<ChannelPipeline> print =p -> {
     p.addLast(
             new PrintInboundHandler("id1")
    );
  };

   private static  Consumer<ChannelPipeline> decode= p->{
     p.addLast(new LengthFieldBasedFrameDecoder(1024,2,2,-2,0))
      .addLast(new DefaultEventExecutorGroup(16),new IProtocalHandler())
      .addLast(new StringEncoder(CharsetUtil.UTF_8));
  };

   private static  void start(int port) throws InterruptedException {
       // bossGroups,是专门做accept功能用的
       EventLoopGroup bossGroup = new NioEventLoopGroup();
       //workerGroup,对应read、send等其他操作
       EventLoopGroup workerGroup = new NioEventLoopGroup();
       try {
           ServerBootstrap b=new ServerBootstrap();
           b.group(bossGroup,workerGroup);
           b.channel(NioServerSocketChannel.class);
           b.childHandler(new ChannelInitializer() {
               @Override
               protected void initChannel(Channel ch) throws Exception {
                   use(ch.pipeline(), echo);
              }
          });
           ChannelFuture f=b.bind(port).sync();
           f.channel().closeFuture().sync();
      } finally {
           bossGroup.shutdownGracefully();
           workerGroup.shutdownGracefully();
      }

  }
   public static void main(String[] args) throws InterruptedException {
       start(2020);
  }
}

接下去的话,我们讲一讲源码:Netty源码

文章转载自爱做菜的程序猿,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论