在网络通信中,当系统的负载压力比较大的时候,系统进入负荷状态。此时,如果系统全力处理业务请求,可能会导致消息积压或者应用宕机。这种时候可以拒绝一定的请求,相当于做限流。
除了动态限流,有时候还需要对消息的读取和发生速度进行控制,这样可以保护下游的处理层不受突发流量的冲击。
在Netty中提供了流量整形的处理器ChannelTrafficShappingHandler来对消息读取和写入的速度做限制,使其保证一个稳定的速率。
下面我们来看看ChannelTrafficShappingHandler源码。
首先,ChannelTrafficShappingHandler继承自
AbstractTrafficShappingHandler。
1、构造方法
ChannelTrafficShappingHandler有三个构造方法,如下的构造方法传入写入限流的字节,即writeLimit的单位是Byte,即checkInterval周期内允许写入最大writeLimit字节。允许读取readLimit字节数。

其内部调用父类的构造方法,接着我们去其父类
AbstractTrafficShappingHandler看看。

从源码可以看成,其设置了内部的几个参数。
2、channelRead方法


从源码可以看成,首先调用calculateSize来计算msg的长度。因为本Handler是添加在Pipeline上的第一个Handler,因此从缓冲区读出的消息率先进入此handler。然后记录当前的系统时间。
然后进入trafficCounter判断是否为空。这里的trafficCounter是在
ChannelTrafficShappingHandler类中通信的channel被添加到pipeline上就进行了初始化,然后调用父类AbstractTrafficShappingHandler中的
setTrafficCounter方法设置进去。

我们继续看channelRead方法的源码。由于trafficCounter不为空,然后计算wait时间。这里的wait时间是计算需要暂停读取消息的时间,源码如下。
如果当前时间减去上一次的时间太小,那么返回0。否则进行计算等待时间。 这里我们假如读取到缓冲区数据2000byte,但是设置的readlimit为1000,那么(byte*1000/limit)=2000ms,相当于当前读取的字节应该是2000ms发送完。然后减去interval,也就是去掉消耗的时间,还剩1500ms需要等待。

如果wait的时间大于最小等待间隔MINIAL_WAIT=10ms,则进行限流操作。如果当前的ctx没有被暂停,则设置暂停读取状态。
如果当前设置暂停读取,则封装成一个reopen_task,等待wait时间后再重新设置可读状态并去从缓冲区读取数据。如果当前的ctx已经暂停了,则封装成一个Runnable,等待wait时间后向后面的handler传递消息。
3、write方法
write方法是限制写出的速率。具体源码如下:

write方法的源码,在计算出wait时间后,然后使用当前EventLoop线程的schedule方法等待wait时间后再将当前msg传递下去。
此外,还有个全局有效的整流Handler类:GlobalTrafficShapingHandler。它可以针对某个进程所有的链路进行消息发送和消息读取速度的控制。




