点对点(point to point)消息发送
消息生产者生产消息发送到 queue 中,然后消息消费者从 queue 中取出并且消费消息。消息被消费以后,queue 中的消息被删掉,一个 queue 可以有多个消费者,但是对一个消息而言,只会有一个消费者可以消费到该消息。
点对点消息发送演示过程
1、启动 ActiveMQ
打开 ActiveMQ 管理界面,点开 Queues 选项卡,没有队列,如下:

2、ActiveMQ 的依懒包
ActiveMQ 开发只需引入 activemq-all-5.15.0.jar 这一个包即可,因为它集成了所有开发需要的jar包。
<!-- activemq 依懒包 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.0</version></dependency>
3、建立消息生产者的工程,producer
新建一个类 MessageSender 用于发送点对点消息
package producer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 点对点(point to point)消息发送** @author JPM*/public class MessageSender {public static void main(String[] args) {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 创建连接工厂Connection connection = null;try {connection = connectionFactory.createConnection(); // 创建连接connection.start(); // 启动连接// 创建session,两个参数分别表示:是否启动事务,消息确认模式Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);// 创建消息的目的地,createQueue表示创建的是队列消息Destination destination = session.createQueue("queue_01");// 创建消息生产者MessageProducer producer = session.createProducer(destination);// 创建需要发送的消息TextMessage textMessage = session.createTextMessage("hello,queue_01!");// 发送消息producer.send(textMessage);// 开启事务的时候,消息发送必须使用commit提交session.commit();session.close();} catch (JMSException e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}
4、运行消息生产者类,查看 ActiveMQ 管理界面
运行消息生产者类 MessageSender,向队列 “queue_01” 发送一条消息 “hello,queue_01!” ,打开 ActiveMQ 管理界面,如下:

此时说明消息生产者向队列写入消息成功。
5、建立消息消费者的工程,consumer
新建一个类 MessageReceiver 用于接收点对点消息
package consumer;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 点对点(point to point)消息接收** @author JPM*/public class MessageReceiver {public static void main(String[] args) {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection connection = null;try {connection = connectionFactory.createConnection();connection.start();// 表示消息由客户端自动确认Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("queue_01");// 创建消息消费者MessageConsumer consumer = session.createConsumer(destination);TextMessage textMessage = (TextMessage) consumer.receive(); // 接收消息System.out.println("MessageReceiver--->" + textMessage.getText());session.close();} catch (JMSException e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}
6、运行消息消费者类,查看 ActiveMQ 管理界面
运行消息生产者类 MessageReceiver,从队列 “queue_01” 后去一条消息,并输出到控制台 ,控制台显示:
MessageReceiver--->hello,queue_01!
打开 ActiveMQ 管理界面,如下:

至此说明队列里的消息已经被消费掉。
**7、消息消费者监听消息的关键代码**
// 创建消息消费者MessageConsumer consumer = session.createConsumer(destination);// 监听消息consumer.setMessageListener(new MessageListener() {public void onMessage(Message msg) {TextMessage textMessage = (TextMessage)msg;try {System.out.println("MessageReceiver--->" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});
文章转载自追梦Java,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




