我准备战斗到最后,不是因为我勇敢,是我想见证一切。 --双雪涛《猎人》
我准备战斗到最后,不是因为我勇敢,是我想见证一切。 --双雪涛《猎人》
TCP的粘包拆包问题
HearBeat 心跳检测的实现
断点重连详解
❝❞
一个技术,为什么要用它,解决了那些问题? 如果不用会怎么样,有没有其它的解决方法? 对比其它的解决方案,为什么最终选择了这种,都有何利弊? 你觉得项目中还有那些地方可以用到,如果用了会带来那些问题? 这些问题你又如何去解决的呢?
「本文基于Netty 4.1.45.Final-SNAPSHOT」
TCP的粘包拆包问题
由于TCP是一个流协议,一个没有界限的一长串二进制数据,TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
「解决方案」
1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符。
3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
Netty提供了多个解码器,可以进行分包的操作,如下:
LineBasedFrameDecoder (回车换行分包) DelimiterBasedFrameDecoder(特殊分隔符分包) FixedLengthFrameDecoder(固定长度报文来分包)
❝实际业务中,以第三种最为常见。在传递的数据中,添加一个前缀,int类型,未固定长度4个字节。然后后驱根据具体的字节长度读取固定的长度
具体的实现代码
❞
HearBeat 心跳检测的实现
其实一提到心跳,就会想到长连接,其实长连接只是一个说法,只要连接存在保活机制,一会将该连接保证活动。所以这也就是逻辑上的长连接
Netty中的心跳检测被运用到非常多的框架中,现在一起来看看具体的实现呢?首先要在自己的netty程序中添加心跳机制,基于netty的编码风格,整个netty的心跳都是由io.netty.handler.timeout.IdleStateHandler
handler类来实现的。首先看看其构造方法呢?
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件. writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件. allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
在程序中引入心跳机制,那么就需要在pipeline管道中添加handler。
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
看一下IdleStateHandler
源码的实现逻辑
首先看一下channelRead方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
❝这里并没有做具体的操作逻辑,而是将操作传递给下一个handler了。
❞
再看看ChannelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
❝这里有一个非常重要的
❞initialize(ctx);

❝这里创建了一个
❞ReaderIdleTimeoutTask
的task,查看一下该类中的 run方法。

❝❞
这里存在的逻辑就是,如果获取的nextDelay 为负数,则表明心跳超时,调用
userEventTriggered
自定义逻辑image-20210422221432086 如果不超时,则还是会调用
schedule
方法,这里为什么不讲schedule
设置为一个Timer的定时任务呢?
原因其实很巧妙,如果设置为定时任务,那么每一次请求的,由于网络的问题,那么有可能第一次心跳在1秒完成,如果按照定时任务的方式来进行,那么第二次心跳肯定是5s以上,逻辑上根本就不正确了 现在的逻辑就是,每次将读取的时间跟上次心跳的事件做动态改变(有点类似于时间窗口的样子),因为由于第一次的方差已经固定好了,再再后面的计算中,永远的方差都将是3.(精妙的设计啊)
我的代码设计的是,超过三次心跳超时,则下线服务。
代码
断点重连详解
整个断点重连的实现思路,就是基于整个netty的事件驱动来实现的,当连接的服务器下线时,会触发客户端的channelInactive
回调。该操作直接循环调用重连方法就可以实现了。

代码
❝本文仅供笔者本人学习,有错误的地方还望指出,一起进步!望海涵!
「转载请注明出处!」
欢迎关注我的公共号,无广告,不打扰。不定时更新Java后端知识,我们一起超神。
——努力努力再努力xLg
加油!
❞






