暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【Netty】心跳机制与断线重连

爱编码 2021-07-20
1943

欢迎关注公众号:【爱编码】 如果有需要后台回复2019赠送1T的学习资料哦!!

心跳是啥

在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

心跳机制的工作原理

心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.

实现心跳

在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件.

  1. public IdleStateHandler(

  2. int readerIdleTimeSeconds,

  3. int writerIdleTimeSeconds,

  4. int allIdleTimeSeconds) {


  5. this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,

  6. TimeUnit.SECONDS);

  7. }

IdleStateHandler构造函数需要提供三个参数:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

服务端

服务端启动初始化代码

服务器的初始化部分为 pipeline 添加了三个 Handler,其中IdleStateHandler就是心跳处理Handler

  1. public class HeartbeatServer {



  2. public static void main(String[] args) throws InterruptedException {



  3. EventLoopGroup bossGroup = new NioEventLoopGroup();

  4. EventLoopGroup workerGroup = new NioEventLoopGroup();


  5. try {

  6. ServerBootstrap b = new ServerBootstrap();


  7. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

  8. .option(ChannelOption.SO_BACKLOG, 1024)

  9. .childHandler(new ChannelInitializer<SocketChannel>() {

  10. @Override

  11. protected void initChannel(SocketChannel socketChannel) throws Exception {

  12. socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));

  13. socketChannel.pipeline().addLast(new StringDecoder());

  14. socketChannel.pipeline().addLast(new HeartBeatServerHandler());

  15. }

  16. });


  17. ChannelFuture f = b.bind(8089).sync();

  18. f.channel().closeFuture().sync();

  19. }catch (Exception e){

  20. e.printStackTrace();

  21. }finally {

  22. workerGroup.shutdownGracefully().sync();

  23. bossGroup.shutdownGracefully().sync();

  24. }


  25. }

  26. }

服务端心跳处理Handler

IdleStateHandler 是实现心跳的关键, 它会根据不同的 IO idle 类型来产生不同的 IdleStateEvent 事件, 而这个事件的捕获, 其实就是在 userEventTriggered 方法中实现的.

  1. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {

  2. private int lossConnectCount = 0;


  3. @Override

  4. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

  5. System.out.println("已经5秒未收到客户端的消息了!");

  6. if (evt instanceof IdleStateEvent){

  7. IdleStateEvent event = (IdleStateEvent)evt;

  8. if (event.state()== IdleState.READER_IDLE){

  9. lossConnectCount++;

  10. if (lossConnectCount>2){

  11. System.out.println("关闭这个不活跃通道!");

  12. ctx.channel().close();

  13. }

  14. }




  15. }else {

  16. super.userEventTriggered(ctx,evt);

  17. }

  18. }


  19. @Override

  20. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

  21. lossConnectCount = 0;

  22. System.out.println("client says: "+msg.toString());

  23. }


  24. @Override

  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

  26. ctx.close();

  27. }

  28. }

客户端

客户端启动初始化代码

心跳的配置也是跟服务端一样,往pipeline中添加IdleStateHandler,其中的参数可以自己随意配置。

  1. public class HeartBeatClient {


  2. public static void main(String[] args) throws Exception {


  3. EventLoopGroup group = new NioEventLoopGroup();


  4. try {

  5. Bootstrap b = new Bootstrap();

  6. b.group(group).channel(NioSocketChannel.class)

  7. .handler(new ChannelInitializer<SocketChannel>() {

  8. @Override

  9. protected void initChannel(SocketChannel socketChannel) throws Exception {

  10. socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));

  11. socketChannel.pipeline().addLast(new StringEncoder());

  12. socketChannel.pipeline().addLast(new HeartBeatClientHandler());

  13. }

  14. });


  15. ChannelFuture f = b.connect(new InetSocketAddress(8089)).sync();

  16. f.channel().closeFuture().sync();

  17. } catch (InterruptedException e) {

  18. e.printStackTrace();

  19. }finally {

  20. group.shutdownGracefully().sync();

  21. }

  22. }

  23. }

客户端心跳处理代码

关键也是在userEventTriggered方法中实现的,主要的逻辑就是往服务端发送心跳,发了3次就不发了,这时候就会触发服务端的userEventTriggered中lossConnectCount 如果超过2次就把这个通道给断开。也就是把这个客户端给断开。

  1. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {


  2. private int curTime = 0;

  3. private int beatTime = 3;


  4. @Override

  5. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

  6. System.out.println("==channelRead===");

  7. System.out.println(msg.toString());

  8. }




  9. @Override

  10. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

  11. System.out.println("客户端循环心跳监测发送: "+new Date());


  12. if (evt instanceof IdleStateEvent){

  13. IdleStateEvent event = (IdleStateEvent)evt;

  14. if (event.state()== IdleState.WRITER_IDLE){

  15. if (curTime<beatTime) {

  16. curTime++;

  17. ctx.writeAndFlush("biubiu");

  18. }

  19. }

  20. }

  21. }

  22. }

小结

Netty心跳的做法大致就是如此,

1.利用 IdleStateHandler来产生对应的 idle 事件。 2.userEventTriggered中做好心跳交互逻辑。

至于更加复杂的逻辑,还是各位遇到的时候自己发挥。

断线重连

服务端代码依旧是上面的不变。

客户端

主要工作以及初始化代码如下:

1.通过 channel().eventLoop().schedule
来延时10s 后尝试重新连接.

  1. public class ReconnectClient {



  2. private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);

  3. private Channel channel;

  4. private Bootstrap bootstrap;


  5. public static void main(String[] args) throws Exception {

  6. ReconnectClient client = new ReconnectClient();

  7. client.start();

  8. client.sendData();

  9. }


  10. public void sendData() throws Exception {

  11. Random random = new Random(System.currentTimeMillis());

  12. for (int i = 0; i < 10000; i++) {

  13. if (channel != null && channel.isActive()) {

  14. channel.writeAndFlush("ReconnectClient心跳來了呀.....");

  15. }


  16. Thread.sleep(random.nextInt(20000));

  17. }

  18. }


  19. public void start() {

  20. try {

  21. bootstrap = new Bootstrap();

  22. bootstrap

  23. .group(workGroup)

  24. .channel(NioSocketChannel.class)

  25. .handler(new ChannelInitializer<SocketChannel>() {

  26. @Override

  27. protected void initChannel(SocketChannel ch) throws Exception {

  28. ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));

  29. ch.pipeline().addLast(new StringEncoder());

  30. ch.pipeline().addLast(new ReconnectClientHandler(ReconnectClient.this));

  31. }

  32. });

  33. doConnect();


  34. } catch (Exception e) {

  35. throw new RuntimeException(e);

  36. }

  37. }


  38. protected void doConnect() {

  39. if (channel != null && channel.isActive()) {

  40. return;

  41. }


  42. ChannelFuture future = bootstrap.connect(new InetSocketAddress(8089));


  43. future.addListener(new ChannelFutureListener() {

  44. @Override

  45. public void operationComplete(ChannelFuture futureListener) throws Exception {

  46. if (futureListener.isSuccess()) {

  47. channel = futureListener.channel();

  48. System.out.println("Connect to server successfully!");

  49. } else {

  50. System.out.println("Failed to connect to server, try connect after 10s");


  51. futureListener.channel().eventLoop().schedule(new Runnable() {

  52. @Override

  53. public void run() {

  54. doConnect();

  55. }

  56. }, 10, TimeUnit.SECONDS);

  57. }

  58. }

  59. });

  60. }

  61. }

断线重连处理Handler

2.断线重连的关键一点是检测连接是否已经断开. 因此我们重写了 channelInactive 方法. 当 TCP 连接断开时, 会回调channelInactive方法, 因此我们在这个方法中调用 client.doConnect() 来进行重连.

  1. public class ReconnectClientHandler extends ChannelInboundHandlerAdapter {


  2. private int curTime = 0;

  3. private int beatTime = 3;


  4. private ReconnectClient client;

  5. public ReconnectClientHandler(ReconnectClient client) {

  6. this.client = client;

  7. }


  8. @Override

  9. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

  10. System.out.println("ReconnectClientHandler客户端循环心跳监测发送: "+new Date());


  11. if (evt instanceof IdleStateEvent){

  12. IdleStateEvent event = (IdleStateEvent)evt;

  13. if (event.state()== IdleState.WRITER_IDLE){

  14. if (curTime<beatTime) {

  15. curTime++;

  16. ctx.writeAndFlush("ReconnectClientHandler=biubiu.....");

  17. }

  18. }

  19. }

  20. }


  21. @Override

  22. public void channelInactive(ChannelHandlerContext ctx) throws Exception {

  23. super.channelInactive(ctx);

  24. client.doConnect();

  25. System.out.println("重新連接了呀。。。。");

  26. }

  27. }

总结

心跳机制与断线重连的基本步骤如上所述。

最后

如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。关注公众号【爱编码】,回复2019有相关资料哦。


文章转载自爱编码,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论