01
—
Pub/Sub模型
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

如果希望发送的消息可以不被做任何处理、或者只被一个消费者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
02
—
订阅者Subscriber
package consumer;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Subscriber01{public static void main(String[] args) throws JMSException{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");Connection connection = connectionFactory.createConnection();//持久订阅,需要分配一个id来表示订阅者connection.setClientID("subscriber-01");connection.start();Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//创建1个topicTopic topic = session.createTopic("activeMQ-topic-1");MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"subscriber-01");//接受消息,采用同步的方式TextMessage textMessage = (TextMessage) messageConsumer.receive();//把接受到的消息显示在控制台上System.out.println("订阅者01接受到的信息是:" + textMessage.getText());//关闭资源messageConsumer.close();session.close();connection.close();}}
2. 订阅者Subscriber02类
package consumer;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Subscriber02{public static void main(String[] args) throws JMSException{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");Connection connection = connectionFactory.createConnection();持久订阅,需要分配一个id来表示订阅者connection.setClientID("subscriber-02");connection.start();Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);创建1个topicTopic topic = session.createTopic("activeMQ-topic-1");MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"subscriber-02");接受消息,采用同步的方式TextMessage textMessage = (TextMessage) messageConsumer.receive();把接受到的消息显示在控制台上System.out.println("订阅者02接受到的信息是:" + textMessage.getText());//关闭资源messageConsumer.close();session.close();connection.close();}}
3. 运行Subscriber01和02程序后后,查看后台activemq界面
看到两个topic。


03
—
发布者Publisher
1.发布者publisher代码
package product;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Publisher{public static void main(String[] args) throws JMSException{创建链接,链接到1个activeMQ的broker实例上,端口使用active默认的61616。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");Connection connection = connectionFactory.createConnection();//开启链接connection.start();//创建1个session会话,第一个参数:是否支持事务,先选择不支持。第二个参数:表示签收的模式,这里选择自动签收。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建1个topic的名字叫activeMQ-1Topic topic = session.createTopic("activeMQ-topic-1");//创建消息的生产者MessageProducer messageProducer = session.createProducer(topic);//创建1个消息TextMessage textMessage = session.createTextMessage("welcome to learn activeMQ-topic!");//发送消息messageProducer.send(textMessage);//关闭资源messageProducer.close();session.close();connection.close();}}
2. 运行Publisher后,查看后台active mq界面。

04
—
查看控制台输出
1.订阅者subscriber01接受到消息

2.订阅者subscriber01接受到消息

05
—
P2P对比PUB/SUB
| 内容 | P2P | PUB/SUB |
角色 | Queue、 Sender、 Receiver | Topic、 Publisher、 Subscriber |
消息与消费者关系 | 1个消息1个消费者。一旦被1个消费者消费了,这个消息就不在队列里面了。 | 1个消息多个消费者。 |
发送者和接受者在时间上的依赖性 | 在时间上没有依赖性。 当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。 | 严格的时间依赖性。针对某个主题(Topic)必须先创建一个订阅者之后,才能消费发布者的消息。为了消费消息,订阅者必须保持运行的状。为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。 |
文章转载自畅谈Fintech,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




