改造样例
依赖 pom.xml
<!--
5.X以上版本的RocketMQ的连接协议采用gRPC + Remoting, 官方主推gRPC, Remoting逐渐弃用
目前rocketmq-spring-boot-starter依赖采用的是5.X的API包以及4.X的直连Nameserver并使用Remoting协议,不经过Proxy层。此依赖包在今后有弃用或大幅度修改的可能性, 需注意
-->
<!-- 当前使用2.3.0的版本, 因为此版本默认RocketMQ版本5.2.0, 无需排除包后再添加 -->
<properties>
<rocketmq.v4.version>2.3.0</rocketmq.v4.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.v4.version}</version>
</dependency>
<dependencies>配置文件
# TMS
tms:
# 环境前缀, 开发/测试环境必须
prefix: developer
# Kafka - RocketMQ 切换开关, true为使用RocketMQ
# 后续改造会完全去除Kafka相关内容
rocketmq:
enabled: true
rocketmq:
# 测试环境Nameserver集群
name-server: 10.0.97.116:9876;10.0.97.117:9886;10.0.97.118:9896
producer:
retry-next-server: true
secret-key:
access-key:
# 服务默认生产者组, 不同服务需修改服务名
group: base_setting_producer_group_${tms.prefix:default}
consumer:
secret-key: ${rocketmq.producer.secret-key:}
access-key: ${rocketmq.producer.access-key:}
selector-expression: ${tms.prefix:*}
# 服务默认消费者组, 不同服务需修改服务名
group: base_setting_consumer_group_${tms.prefix:default}代码
⚠️注意:
EnvironmentConstant来自base-common依赖, 如果不使用base-common依赖可以单独复制此类- MQ消息收发使用
TLog作为包装,代码有一定侵入性 @RocketMQMessageListener注解中的selectorExpression变量是固定写法,无需变动。如需特殊消费者组或tag再进行修改@RocketMQMessageListener注解中的group: consumerGroup需要保证订阅一致性,同一个项目内,同一个group对应同一个topic和tag,例如consumerGroup = "${rocketmq.consumer.group}" + StrUtil.DASHED + ListenerUtils.RMQ_TOPICRocketMQListener接口中接收的泛型可以直接使用Bean,例如RocketMQListener,但是为了兼容性推荐使用String>
生产者发送消息示例(工具类)
package com.xiaoniu.setting.utils;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.xiaoniu.common.constant.EnvironmentConstant;
import com.yomahub.tlog.core.mq.TLogMqWrapBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
/**
* @Author Tenshin
* @Date 2024-08-19 10:06
* @Version 0.0.1
* @Description RocketMqSendUtil RocketMQ消息发送工具类
*/
@Slf4j
@Component
public class RocketMqSendUtil {
/**
* RocketMQTemplate
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
*
* @param topic 主题
* @param obj 消息对象
*/
public void send(String topic, Object obj) {
if (StrUtil.isBlank(topic)) {
log.error("====> RocketMQ消息发送 - Topic为空, 发送失败 <====");
return;
}
if (null == obj) {
log.error("====> RocketMQ消息发送 - 消息为空, 发送失败 <====");
return;
}
String json = JSON.toJSONString(obj);
log.info("====> RocketMQ消息发送 - 准备发送至MQ - 原始Topic: {}, 消息参数为: {} <====", topic, json);
// Topic添加环境Tag
String topicWithTag = EnvironmentConstant.topicWithTag(topic);
// TLog日志包装类
TLogMqWrapBean<String> tLogMqWrapBean = new TLogMqWrapBean<>(json);
try {
SendResult sendResult = rocketMQTemplate.syncSend(topicWithTag, tLogMqWrapBean);
log.info("====> RocketMQ消息发送 - 发送成功 - MessageId: {}, Topic: {} <====", sendResult.getMsgId(), topicWithTag);
} catch (MessagingException e) {
log.error("====> RocketMQ消息发送 - 发送失败 <====", e);
throw e;
}
}
}
消费者接收消息示例
package com.xiaoniu.motorcade.rocketmq;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.xiaoniu.common.utils.XiaoniuThreadLocal;
import com.xiaoniu.motorcade.kafka.model.bo.ImportData;
import com.xiaoniu.motorcade.kafka.service.AsyncImportService;
import com.xiaoniu.motorcade.kafka.service.ImportContext;
import com.xiaoniu.motorcade.kafka.service.ImportStrategyService;
import com.xiaoniu.motorcade.kafka.utils.ListenerUtils;
import com.xiaoniu.motorcade.util.ChangeClassType;
import com.yomahub.tlog.core.mq.TLogMqConsumerProcessor;
import com.yomahub.tlog.core.mq.TLogMqRunner;
import com.yomahub.tlog.core.mq.TLogMqWrapBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import java.util.Optional;
/**
* @Author Tenshin
* @Date 2024-08-20 12:46
* @Version 0.0.1
* @Description VehiclesStopInfoQueryRocketMqListener 载具历史经停点RocketMQ消息监听器
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = ListenerUtils.RMQ_TOPIC, selectorExpression = "${rocketmq.consumer.selector-expression}", consumerGroup = "${rocketmq.consumer.group}" + StrUtil.DASHED + ListenerUtils.RMQ_TOPIC)
public class VehiclesStopInfoQueryRocketMqListener implements RocketMQListener<String> {
/**
* 异步导入service
*/
@Autowired
private AsyncImportService asyncImportService;
/**
* 导入数据转换
*/
@Autowired
private ChangeClassType changeClassType;
/**
* 导入类型策略
*/
@Autowired
private ImportContext importContext;
/**
* 消费消息
*
* @param s 消息
*/
@Override
public void onMessage(String s) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("====> 载具历史经停点RocketMQ消息监听器 - 开始消费消息: {} <====", s);
try {
TLogMqWrapBean<?> tlog = Optional.ofNullable(s)
.map(x -> JSON.parseObject(x, TLogMqWrapBean.class))
.orElse(null);
if (null == tlog) {
throw new RuntimeException("消息解析失败");
}
TLogMqConsumerProcessor.process(tlog, (TLogMqRunner<String>) x -> {
try {
this.businessConsume(x);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.info("====> 载具历史经停点RocketMQ消息监听器 - 消费成功 <====");
} catch (Exception e) {
log.error("====> 载具历史经停点RocketMQ消息监听器 - 消费异常 <====", e);
} finally {
stopWatch.stop();
log.info("====> 载具历史经停点RocketMQ消息监听器 - 消费完成, 耗时: {} 毫秒 <====", stopWatch.getLastTaskTimeMillis());
}
}
/**
* 业务处理
*
* @param json 消息体
* @throws Exception 异常
*/
private void businessConsume(String json) throws Exception {
ImportData<?> importData = changeClassType.change(JSON.parseObject(json, ImportData.class));
ImportStrategyService importService = importContext.getImportService(importData.getDocumentType());
// 设置sessionInfo
XiaoniuThreadLocal.set(importData.getSessionInfo());
asyncImportService.asyncHandler(importData, importService, importData.getDocumentType());
}
}
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




