暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

今天学习一下Netty的发布/订阅(Publish/Subscribe)

马士兵 2020-10-22
3184

消息传递有很多种方式,请求/响应(Request/Reply)是最常用的。在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端。HTTP协议也是基于请求/响应的方式。

但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应。

发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式。订阅者 Subscriber 连接到服务器客户端后,相当于开始订阅发布者 Publisher 发布的消息,当发布者发布了一条消息后,所有订阅者都会接收到这条消息。

网络聊天室一般就是基于发布/订阅模式来实现。例如加入一个QQ群,就相当于订阅了这个群的所有消息,当有新的消息,服务器会主动将消息发送给所有的客户端。只不过聊天室里的所有人既是发布者又是订阅者。

下面用Netty实现简单的发布/订阅模式的服务器程序,连接到服务器的所有客户端都是订阅者,当发布者发布一条消息后,服务器会将消息转发给所有客户端。

Netty 提供了 ChannelGroup 来用于保存 Channel 组,ChannelGroup 是一个线程安全的 Channel 集合,它提供了一些列 Channel 批量操作。当一个 TCP 连接关闭后,对应的 Channel 会自动从 ChannelGroup 移除,所以不用手动去移除关闭的 Channel。

当有新的客户端连接到服务器,将对应的 Channel 加入到一个 ChannelGroup 中,当发布者发布消息时,服务器可以将消息通过 ChannelGroup 写入到所有客户端。

public class TcpServer {

 public static void main(String[] args) throws InterruptedException {
   EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workerGroup = new NioEventLoopGroup();
   try {
     ServerBootstrap b = new ServerBootstrap();
     b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() 
{
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
             pipeline.addLast(new LineBasedFrameDecoder(80));
             pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
             pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
             pipeline.addLast(new TcpServerHandler());
           }
         });
     ChannelFuture f = b.bind(8080).sync();
     f.channel().closeFuture().sync();
   } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
   }
 }
}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

 // ChannelGroup用于保存所有连接的客户端,注意要用static来保证只有一个ChannelGroup实例,否则每new一个TcpServerHandler都会创建一个ChannelGroup
 private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

 @Override
 public void channelActive(ChannelHandlerContext ctx) {
   channels.add(ctx.channel()); // 将新的连接加入到ChannelGroup,当连接断开ChannelGroup会自动移除对应的Channel
 }

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
   channels.writeAndFlush(msg + "\r\n"); // 接收到消息后,将消息发送到ChannelGroup中的所有客户端
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
   // cause.printStackTrace();  暂时把异常打印注释掉,因为PublishClient发布一条消息后会立即断开连接,而服务器也会向PublishClient发送消息,所以会抛出异常
   ctx.close();
 }
}

下面分别是两个客户端程序,一个是用于发布消息的客户端,一个是订阅消息的客户端。发布消息的客户端很简单,就是向服务器write一条消息即可:

public class PublishClient {

 public static void main(String[] args) throws IOException {

   Socket socket = null;
   OutputStream out = null;

   try {

     socket = new Socket("localhost"8080);
     out = socket.getOutputStream();
     out.write("Hello\r\n".getBytes()); // 发布信息到服务器
     out.flush();

   } finally {
     // 关闭连接
     out.close();
     socket.close();
   }
 }
}

订阅消息的客户端连接到服务器后,会阻塞等待接收服务器发送的发布消息:

public class SubscribeClient {

 public static void main(String[] args) throws IOException {

   Socket socket = null;
   BufferedReader in = null;

   try {

     socket = new Socket("localhost"8080);
     in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

     while (true) {
       String line = in.readLine(); // 阻塞等待服务器发布的消息
       if(line == null) {
         break;
       }
       System.out.println(line);
     }

   } finally {
     // 关闭连接
     in.close();
     socket.close();
   }
 }
}

下面进行测试:

  1. 测试时首先开启服务器;
  2. 然后再运行订阅消息的客户端 SubscribeClient,SubscribeClient 可以开启多个;
  3. 最后运行发布消息的客户端PublishClient,可以多次运行查看所有SubscribeClient的输出结果。

运行结果可以发现,当运行发布消息的客户端 PublishClient 发布一条消息到服务器时,服务器会主动将这条消息转发给所有的 TCP 连接,所有的订阅消息的客户端 SubscribeClient 都会接收到这条消息并打印出来。

如有收获请划至底部

点击“在看”支持,谢



关注马士兵

每天分享技术干货





点赞是最大的支持 

文章转载自马士兵,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论