暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

activeMQ的Pub/Sub模型简单例子

畅谈Fintech 2021-06-25
676


上一次分享了activeMQ的P2P模型的简单例子这次给大家介绍activeMQ的Pub/Sub模型。






01


Pub/Sub模型


包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

如果希望发送的消息可以不被做任何处理、或者只被一个消费者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。




02



订阅者Subscriber


1. 订阅者Subscriber01类
    package consumer;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;


    public class Subscriber01
    {
        public static void main(String[] args) throws JMSException
        {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
    Connection connection = connectionFactory.createConnection();
            //持久订阅,需要分配一个id来表示订阅者
    connection.setClientID("subscriber-01");
            connection.start();
            Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //创建1个topic
    Topic topic = session.createTopic("activeMQ-topic-1");
    MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"subscriber-01");
            //接受消息,采用同步的方式
    TextMessage textMessage = (TextMessage) messageConsumer.receive();
    //把接受到的消息显示在控制台上
    System.out.println("订阅者01接受到的信息是:" + textMessage.getText());
            //关闭资源
    messageConsumer.close();
    session.close();
    connection.close();
    }
    }


    2. 订阅者Subscriber02类


      package consumer;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import javax.jms.*;


      public class Subscriber02
      {
          public static void main(String[] args) throws JMSException
          {
      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
              Connection connection  = connectionFactory.createConnection();
      持久订阅,需要分配一个id来表示订阅者
              connection.setClientID("subscriber-02");
              connection.start();
              Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
      创建1个topic
      Topic topic = session.createTopic("activeMQ-topic-1");
              MessageConsumer  messageConsumer = session.createDurableSubscriber(topic,"subscriber-02");
      接受消息,采用同步的方式
      TextMessage textMessage = (TextMessage) messageConsumer.receive();
      把接受到的消息显示在控制台上
      System.out.println("订阅者02接受到的信息是:" + textMessage.getText());
              //关闭资源
      messageConsumer.close();
      session.close();
      connection.close();
      }
      }



      3. 运行Subscriber01和02程序后后,查看后台activemq界面

      看到两个topic。







      03





      发布者Publisher



      1.发布者publisher代码

        package product;
        import org.apache.activemq.ActiveMQConnectionFactory;
        import javax.jms.*;
        public class Publisher
        {
            public static void main(String[] args) throws JMSException
            {
        创建链接,链接到1个activeMQ的broker实例上,端口使用active默认的61616。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = connectionFactory.createConnection();
                //开启链接
        connection.start();
                //创建1个session会话,第一个参数:是否支持事务,先选择不支持。第二个参数:表示签收的模式,这里选择自动签收。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建1个topic的名字叫activeMQ-1
        Topic topic = session.createTopic("activeMQ-topic-1");
                //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
                //创建1个消息
        TextMessage textMessage = session.createTextMessage("welcome to learn activeMQ-topic!");
                //发送消息
        messageProducer.send(textMessage);
                //关闭资源
        messageProducer.close();
        session.close();
                connection.close();
             }
        }


        2. 运行Publisher后,查看后台active mq界面。






        04




        查看控制台输出





        1.订阅者subscriber01接受到消息




        2.订阅者subscriber01接受到消息






        05


        P2P对比PUB/SUB


        内容
        P2P
        PUB/SUB


         角色

        Queue、

        Sender、

        Receiver

        Topic、

        Publisher、

        Subscriber

        消息与消费者关系


        1个消息1个消费者。一旦被1个消费者消费了,这个消息就不在队列里面了。1个消息多个消费者。

        发送者和接受者在时间上的依赖性

        在时间上没有依赖性。

        当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

        严格的时间依赖性。针对某个主题(Topic)必须先创建一个订阅者之后,才能消费发布者的消息。为了消费消息,订阅者必须保持运行的状。为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。




        文章转载自畅谈Fintech,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论