
消息确认机制。
// 普通Confirmpublic class Publish1 {@Testpublic void testPublish() throws IOException {// 普通Confirm, confirm 只能确认消息有没有到达 exchange 无法保证是否到达queueConnection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 开启消息确认机制channel.confirmSelect();String queue = "queue";String msg = "消息!";channel.basicPublish("",queue,null,msg.getBytes());// 消息是否到达交换机try {if (channel.waitForConfirms()) {System.out.println("消息到达交换机");} else {System.out.println("消息未到达交换机");}} catch (Exception e) {e.printStackTrace();System.out.println("消息未到达交换机");}try {channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}// 批量Confirmpublic class Publish2 {@Testpublic void testPublish() throws IOException {// 批量ConfirmConnection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 开启消息确认机制channel.confirmSelect();String queue = "queue";for (int i = 0; i < 10; i++) {String msg = "消息! " + i;channel.basicPublish("",queue,null,msg.getBytes());}try {// 有一个失败的时候,就直接全部失败channel.waitForConfirmsOrDie();System.out.println("消息到达交换机");} catch (Exception e) {e.printStackTrace();System.out.println("消息未到达交换机");}try {channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}// 异步Confirmpublic class Publish3 {@Testpublic void testPublish() throws IOException {// 异步 ConfirmConnection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();// 开启消息确认机制channel.confirmSelect();try {channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {System.out.println("消息发送成功,标识:" + l + ",是否是批量" + b);}@Overridepublic void handleNack(long l, boolean b) throws IOException {System.out.println("消息发送失败,标识:" + l + ",是否是批量" + b);}});} catch (Exception e) {e.printStackTrace();System.out.println("消息未到达交换机");}String queue = "queue";String msg = "消息! ";channel.basicPublish("",queue,null,msg.getBytes());System.in.read();try {channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}// Return机制,监听消息是否从exchange送到了指定的queue中public class Publish4 {@Testpublic void testPublish() throws IOException, TimeoutException {// return 机制Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {// 当消息没有送达到queue时,才会执行。System.out.println(new String(bytes,"UTF-8") + "没有送达到Queue中!!");}});String queue = "queue";String msg = "消息! ";// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调channel.basicPublish("",queue,true,null,msg.getBytes());System.in.read();channel.close();connection.close();}}
SpringBoot 实现。
// application.xmlspring:rabbitmq:publisher-confirm-type: simple #新版本用publisher-confirms: true #开启confimr确认机制publisher-returns: true #开启return机制
// RabbitMQConfirmAndReturn.java 开启Confirm和Return@Componentpublic class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 当前类创建完成后调用@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if(b){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("消息没有送达到Queue");}}
避免消息重复消费。主要用到 redis 的 setnx 存值特性。
// pom.xml<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency>// Publish.java 生产者// 指定messageIdAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(1) ///指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化.messageId(UUID.randomUUID().toString()).build();for (int i = 0; i < 10; i++) {String queue = "queue";String msg = "消息! " + i;// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调channel.basicPublish("",queue,true,properties,msg.getBytes());}// Consume.java 消费者// 监听DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {Jedis jedis = new Jedis("192.168.199.109", 6379);String messageId = properties.getMessageId();// setnx 存值,key不存在才能存值String result = jedis.set(messageId, "0", "NX", "EX", 10);if (result != null && "ok".equalsIgnoreCase(result)) {System.out.println("接收: " + new String(body, "UTF-8"));jedis.set(messageId, "1");channel.basicAck(envelope.getDeliveryTag(), false);} else {if ("1".equals(jedis.get(messageId))) {channel.basicAck(envelope.getDeliveryTag(), false);}}}};channel.basicConsume(queue, false, consumer);
SpringBoot 实现。
// pom.xml<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>// application.ymlspring:redis:host: 192.168.199.109port: 6379
// 修改生产者@Testpublic void contextLoads() {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());for (int i = 0; i < 10; i++) {String msg = "快红狗 " + i;rabbitTemplate.convertAndSend("boot-mq-exchange1","fast.red.dog",msg, messageId);}}// 修改消费者@Componentpublic class RabbitMQListener {@Autowiredprivate StringRedisTemplate redisTemplate;// 手动ack@RabbitListener(queues = "boot-mq-queue")public void receive(String msg, Channel channel, Message message) throws IOException {String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {System.out.println("接收: " + msg);redisTemplate.opsForValue().set(messageId, "1");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {if ("1".equals(redisTemplate.opsForValue().get(messageId))) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}}}
RabbitMQ 应用。
修改前文 SpringBoot 应用的 es 添加数据功能。客户模块作为生产者,查询模块作为消费者。
// 客户模块// pom.xml<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>// application.xmlspring:rabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /test// CustomerServiceImpl.java// HttpHeaders httpHeaders = new HttpHeaders();// httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));// HttpEntity httpEntity = new HttpEntity(json, httpHeaders);//// // 调用 ES 添加请求//// restTemplate.postForObject("http://localhost:8080/search/add", httpEntity, String.class);rabbitTemplate.convertAndSend("customer-exchange", "open.search.add", json);
// 搜索模块// pom.xml<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>// application.xmlspring:rabbitmq:host: 192.168.199.109port: 5672username: testpassword: testvirtual-host: /testlistener:simple:acknowledge-mode: manual// RabbitMQConfig.java@Configurationpublic class RabbitMQConfig {@Beanpublic TopicExchange getTopicExchange() {return new TopicExchange("customer-exchange", true, false);}@Beanpublic Queue getQueue() {return new Queue("customer-queue", true, false, false, null);}@Beanpublic Binding getBinding(TopicExchange exchange, Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("open.search.*");}}// RabbitMQListener.java@Componentpublic class RabbitMQListener {@Autowiredprivate SearchService searchService;@RabbitListener(queues = "customer-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("rabbitmq receive: " + msg);System.out.println("RoutingKey: " + message.getMessageProperties().getReceivedRoutingKey());ObjectMapper objectMapper = new ObjectMapper();Customer customer = objectMapper.readValue(msg, Customer.class);searchService.addCustomer(customer);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
文章转载自java小小小小栈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




