Netty核心组件
Netty核心概念
Channel:对应NIO的SocketChannel
EventLoop:对应NIO的Selector
ChannelPipeline:业务逻辑
ChannelHandler:业务逻辑
Boostrap:可以选择阻塞或者非阻塞,可以选择有几个selector
ByteBuffer:用来存储发送过来的字节
Codec: 帮我们做编码解码工作
EventLoop
EventLoop=线程+selector,在nodeJs里面也有用到,EventLoop顾名思义就是不停的轮询。
select():原本的select()
processSelectedKeys():处理IO事件(最重要的地方)
runAllTasks:处理应用层代码
task queue:用于存放task,当存放的task和当前线程相符就会拿出来执行。
ps:特别注意,runAllTasks和processSelectedKeys不能用阻塞的代码。
整体模型如下图所示:

EventLoopGroup
为了提高更高的处理效率,引入了EventLoop组(selector的名字就叫多路复用),模型如下图所示:

ChannelHandler&Pipeline
ChannelHandle:进行事件的处理,例如channelRead或channelActive,ChannelHandle被分为了两种类型:
内置头handleCtx:每个流程都得进过这个头结点的handle
内置尾TailCtx:每个流程都得以TailCtx结束
ChannellnboundHandler:处理对于程序而言,外部的事件
ChannelOutboundHandler:处理对于程序而言,内部的事件
对于每个Handler,netty可以帮我们处理byteBuf或Message,处理好以后按原本的格式输出。
ChannelPipeline(流水线):里面包含了很多的Handle,是netty封装好的一个流水线。

运行过程
整体过程:事件->selector->信道channel->流水線pipeline->走handler流程

生命周期
channelRegistered:注册事件
channelActive:检查信道是否活跃
channelInactive:信道不活跃
channelUnregistered:取消注册的事件

各组件之间的关系
EVentLoopGroup和EventLoop:1对N,没什么讲的,就像线程和线程池一样
EventLoop和Thread:1对1,为了保证执行的快,不会引起线程间的通信的问题
EventLoop和Channel:1对N,轮询每个信道,这也就是为什么信道不能阻塞的原因,因为一旦阻塞,单线程的EventLoop就会一直等待到当前这个线程执行完毕
Channel和ChannelPipeline:1对1,一个Channel只能有一个ChannelPipeline
ChannelPipeline和ChannelHandle:1对N的关系,一个ChannelPipeline内有多个ChannelHandle参与流程
ChannelHandle和ChannelPipeline:1对N,一个ChannelHandle可以被多个ChannelPipeline调用使用
案例
学了那么多基础知识,写个案例巩固下,写完案例后着手开始源码学习。项目结构如下:

EchoHandler代码如下:
/**
* ChannelInboundHandlerAdapter 主动帮我们去使用下一个handler
* 不用关注于下一个handler的执行
*/
public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String in=(String) msg;
ctx.channel().writeAndFlush(in);
ReferenceCountUtil.release(msg);
}
}
IProtocalHandler代码如下:
public class IProtocalHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, final Object msg) throws Exception {
int sleep = 500 * new Random().nextInt(5);
System.out.println("sleep:" + sleep);
Thread.sleep(sleep);
final ByteBuf buf = (ByteBuf) msg;
char c1 = (char) buf.readByte();
char c2 = (char) buf.readByte();
if (c1 != 'J' || c2 != 'W') {
ctx.fireExceptionCaught(new Exception("magic error"));
return ;
}
buf.readShort();//skip length
String outputStr = buf.toString(CharsetUtil.UTF_8);
System.out.println(outputStr);
ctx.channel().writeAndFlush(outputStr+"\n");
}
}
PipelinePrintHandler代码如下:
/**
* ChannelInboundHandlerAdapter 主动帮我们去使用下一个handler
* 不用关注于下一个handler的执行
*/
public class PipelinePrintHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println(ctx.pipeline().names());
}
}
PrintInboundHandler代码如下:
public class PrintInboundHandler implements ChannelInboundHandler {
private final String id ;
public PrintInboundHandler(String id) {
this.id = id;
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded " + id);
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved "+ id);
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered "+ id);
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered "+ id);
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive "+ id);
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive "+ id);
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead "+ id);
ctx.fireChannelRead(msg);
//ctx.channel().pipeline().fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete "+ id);
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered "+ id);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged "+ id);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught "+ id);
}
}
Server代码如下:
public class Server {
private static void use(ChannelPipeline pipeline, Consumer<ChannelPipeline> strategy){
strategy.accept(pipeline);
}
private static Consumer<ChannelPipeline> echo= p ->{
p.addLast(
new LineBasedFrameDecoder(80,false,false),
new StringDecoder(),
new EchoHandler(),
new PipelinePrintHandler(),
new StringEncoder(StandardCharsets.UTF_8)
);
};
private static Consumer<ChannelPipeline> print =p -> {
p.addLast(
new PrintInboundHandler("id1")
);
};
private static Consumer<ChannelPipeline> decode= p->{
p.addLast(new LengthFieldBasedFrameDecoder(1024,2,2,-2,0))
.addLast(new DefaultEventExecutorGroup(16),new IProtocalHandler())
.addLast(new StringEncoder(CharsetUtil.UTF_8));
};
private static void start(int port) throws InterruptedException {
// bossGroups,是专门做accept功能用的
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workerGroup,对应read、send等其他操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
use(ch.pipeline(), echo);
}
});
ChannelFuture f=b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
start(2020);
}
}
接下去的话,我们讲一讲源码:Netty源码




