定时关单功能
生产者相当于下单接口,当用户下单时,发送一个延迟消息到消息队列中
一段时间后延迟消息被消费者监听到[延迟队列特点]
消费者通过查询订单是否支付/调用第三方支付的查询订单状态接口来确认订单是否支付,然后决定修改订单状态以及消息是否需要重新入队
模型抽取
生产者相当于下单服务,下单的时候会发送一个订单号的消息给延迟队列
消费者相当于定时任务,当延迟消息被消费者监听到的时候,就会检查这笔订单是否支付成功,如果没有支付,则进行关单。


完成配置,即配置好消息队列
引入依赖
<!--引入AMQP--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置rabbitmq的服务端信息以及交换机,队列,路由key信息
#消息队列rabbitmq:host: 8.129.113.233port: 5672virtual-host: /password: passwordusername: admin#开启手动确认消息listener:simple:acknowledge-mode: manual#自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queuemqconfig:#延迟队列,不能被监听消费order_close_delay_queue: order.close.delay.queue#延迟队列的消息过期后转发的队列order_close_queue: order.close.queue#交换机order_event_exchange: order.event.exchange#进入延迟队列的路由keyorder_close_delay_routing_key: order.close.delay.routing.key#消息过期,进入释放队列的key,进入死信队列的keyorder_close_routing_key: order.close.routing.key#消息过期时间,毫秒,测试改为15秒ttl: 15000
编写配置类,读取rabbitmq的相关信息,交给spring容器管理
@Configuration@Datapublic class RabbitMQConfig {/*** 交换机*/@Value("${mqconfig.order_event_exchange}")private String eventExchange;/*** 延迟队列*/@Value("${mqconfig.order_close_delay_queue}")private String orderCloseDelayQueue;/*** 关单队列*/@Value("${mqconfig.order_close_queue}")private String orderCloseQueue;/*** 进入延迟队列的路由key*/@Value("${mqconfig.order_close_delay_routing_key}")private String orderCloseDelayRoutingKey;/*** 进入死信队列的路由key*/@Value("${mqconfig.order_close_routing_key}")private String orderCloseRoutingKey;/*** 过期时间*/@Value("${mqconfig.ttl}")private Integer ttl;/*** 消息转换器* @return*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 创建交换机 Topic类型,也可以用dirct路由* 一般一个微服务一个交换机* @return*/@Beanpublic Exchange orderEventExchange(){return new TopicExchange(eventExchange,true,false);}/*** 延迟队列*/@Beanpublic Queue orderCloseDelayQueue(){Map<String,Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange",eventExchange);args.put("x-dead-letter-routing-key",orderCloseRoutingKey);args.put("x-message-ttl",ttl);return new Queue(orderCloseDelayQueue,true,false,false,args);}/*** 死信队列,普通队列,用于被监听*/@Beanpublic Queue orderCloseQueue(){return new Queue(orderCloseQueue,true,false,false);}/*** 第一个队列,即延迟队列的绑定关系建立* @return*/@Beanpublic Binding orderCloseDelayBinding(){return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseDelayRoutingKey,null);}/*** 死信队列绑定关系建立* @return*/@Beanpublic Binding orderCloseBinding(){return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,eventExchange,orderCloseRoutingKey,null);}}
定时关单服务生产者端代码设计与实现
注意点
用户下单以后需要投递一个延迟消息msg到延迟队列中
延迟消息中的内容需要包含订单流水号,供消费者去查询订单支付信息

延迟消息格式 = 消息id + 订单流水号
@Datapublic class OrderMessage {/*** 消息id*/private Long messageId;/*** 订单号*/private String outTradeNo;}
下单以后发送延迟消息的代码
//发送延迟消息,用于自动关单OrderMessage orderMessage = new OrderMessage();orderMessage.setOutTradeNo(orderOutTradeNo);rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(),rabbitMQConfig.getOrderCloseDelayRoutingKey(),orderMessage);
定时关单服务消费者端代码设计与实现
查询订单数据库中订单是否存在
不存在:则说明消息错误,发送消息ack确认
存在: 查询订单数据库中订单是否支付
若支付:则说明消息正常已经付款,发送ack确认
若未支付: 调用第三方支付平台查询订单接口查询支付状态
若支付:则修改订单数据库中的订单状态,并发送ack确认
若未支付:则修改订单状态为取消,发送ack确认

消费者端的逻辑
@Slf4j@Component@RabbitListener(queues = "${mqconfig.order_close_queue}")public class ProductOrderMQListener {@Autowiredprivate ProductOrderService productOrderService;/**** 消费重复消息,幂等性保证* 并发情况下如何保证安全** @param orderMessage* @param message* @param channel* @throws IOException*/@RabbitHandlerpublic void closeProductOrder(OrderMessage orderMessage, Message message, Channel channel) throws IOException {log.info("监听到消息:closeProductOrder:{}",orderMessage);long msgTag = message.getMessageProperties().getDeliveryTag();try{boolean flag = productOrderService.closeProductOrder(orderMessage);//为true说明订单正常处理以付款,发送ack确认if(flag){channel.basicAck(msgTag,false);}else {//为false说明订单异常,重新入队channel.basicReject(msgTag,true);}}catch (IOException e){log.error("定时关单失败:",orderMessage);channel.basicReject(msgTag,true);}}}
查询订单状态并修改订单数据库的业务逻辑
/*** 定时关单* @param orderMessage* @return*/@Overridepublic boolean closeProductOrder(OrderMessage orderMessage) {ProductOrderDO productOrderDO = productOrderMapper.selectOne(new QueryWrapper<ProductOrderDO>().eq("out_trade_no",orderMessage.getOutTradeNo()));if(productOrderDO == null){//订单不存在log.warn("直接确认消息,订单不存在:{}",orderMessage);return true;}if(productOrderDO.getState().equalsIgnoreCase(ProductOrderStateEnum.PAY.name())){//已经支付log.info("直接确认消息,订单已经支付:{}",orderMessage);return true;}//向第三方支付查询订单是否真的未支付(开发了AlipayStrategy以后才完善了这部分代码)PayInfoVO payInfoVO = new PayInfoVO();payInfoVO.setPayType(productOrderDO.getPayType());payInfoVO.setOutTradeNo(orderMessage.getOutTradeNo());String payResult = payFactory.queryPaySuccess(payInfoVO);//结果为空,则未支付成功,本地取消订单if(StringUtils.isBlank(payResult)){productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.CANCEL.name(),ProductOrderStateEnum.NEW.name());log.info("结果为空,则未支付成功,本地取消订单:{}",orderMessage);return true;}else {//支付成功,主动的把订单状态改成UI就支付,造成该原因的情况可能是支付通道回调有问题log.warn("支付成功,主动的把订单状态改成已经支付,造成该原因的情况可能是支付通道回调有问题:{}",orderMessage);productOrderMapper.updateOrderPayState(productOrderDO.getOutTradeNo(),ProductOrderStateEnum.PAY.name(),ProductOrderStateEnum.NEW.name());return true;}}
思考:在两个不同的系统之间进行交互的时候,我们首先需要考虑的是这两个系统之间需要设计什么徉的交互协议,即数据传输格式。而设计这种数据传输的格式需要根据这些消息的作用是什么来进行设计!!!
算法训练营永久班上课开始啦,欢迎大家积极报名~
奔跑的小梁,公众号:梁霖编程工具库算法训练营春节价格通知,2023年2月12日
文章转载自奔跑的小梁,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




