暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ多环境区分

soul0202 2025-06-25
91
  1. 改造样例

  2. 依赖 pom.xml

<!--
  5.X以上版本的RocketMQ的连接协议采用gRPC + Remoting, 官方主推gRPC, Remoting逐渐弃用
  目前rocketmq-spring-boot-starter依赖采用的是5.X的API包以及4.X的直连Nameserver并使用Remoting协议,不经过Proxy层。此依赖包在今后有弃用或大幅度修改的可能性, 需注意
-->

<!-- 当前使用2.3.0的版本, 因为此版本默认RocketMQ版本5.2.0, 无需排除包后再添加 -->
<properties>
  <rocketmq.v4.version>2.3.0</rocketmq.v4.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.v4.version}</version>
  </dependency>
<dependencies>


  1. 配置文件

# TMS
tms:
  # 环境前缀, 开发/测试环境必须
  prefix: developer
  # Kafka - RocketMQ 切换开关, true为使用RocketMQ
  # 后续改造会完全去除Kafka相关内容
  rocketmq:
    enabled: true

rocketmq:
  # 测试环境Nameserver集群
  name-server: 10.0.97.116:9876;10.0.97.117:9886;10.0.97.118:9896
  producer:
    retry-next-server: true
    secret-key:
    access-key:
    # 服务默认生产者组, 不同服务需修改服务名
    group: base_setting_producer_group_${tms.prefix:default}
  consumer:
    secret-key: ${rocketmq.producer.secret-key:}
    access-key: ${rocketmq.producer.access-key:}
    selector-expression: ${tms.prefix:*}
    # 服务默认消费者组, 不同服务需修改服务名
    group: base_setting_consumer_group_${tms.prefix:default}


  1. 代码

⚠️注意:
  • EnvironmentConstant 来自base-common 依赖, 如果不使用base-common 依赖可以单独复制此类
  • MQ消息收发使用TLog 作为包装,代码有一定侵入性
  • @RocketMQMessageListener 注解中的selectorExpression 变量是固定写法,无需变动。如需特殊消费者组或tag再进行修改
  • @RocketMQMessageListener 注解中的group: consumerGroup需要保证订阅一致性,同一个项目内,同一个group对应同一个topic和tag,例如consumerGroup = "${rocketmq.consumer.group}" + StrUtil.DASHED + ListenerUtils.RMQ_TOPIC
  • RocketMQListener 接口中接收的泛型可以直接使用Bean,例如RocketMQListener> ,但是为了兼容性推荐使用String
  1. 生产者发送消息示例(工具类)
package com.xiaoniu.setting.utils;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.xiaoniu.common.constant.EnvironmentConstant;
import com.yomahub.tlog.core.mq.TLogMqWrapBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

/**
 * @Author Tenshin
 * @Date 2024-08-19 10:06
 * @Version 0.0.1
 * @Description RocketMqSendUtil RocketMQ消息发送工具类
 */
@Slf4j
@Component
public class RocketMqSendUtil {

    /**
     * RocketMQTemplate
     */
    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 发送消息
     *
     * @param topic 主题
     * @param obj   消息对象
     */
    public void send(String topic, Object obj) {
        if (StrUtil.isBlank(topic)) {
            log.error("====> RocketMQ消息发送 - Topic为空, 发送失败 <====");
            return;
        }
        if (null == obj) {
            log.error("====> RocketMQ消息发送 - 消息为空, 发送失败 <====");
            return;
        }
        String json = JSON.toJSONString(obj);
        log.info("====> RocketMQ消息发送 - 准备发送至MQ - 原始Topic: {}, 消息参数为: {} <====", topic, json);
        // Topic添加环境Tag
        String topicWithTag = EnvironmentConstant.topicWithTag(topic);
        // TLog日志包装类
        TLogMqWrapBean<String> tLogMqWrapBean = new TLogMqWrapBean<>(json);
        try {
            SendResult sendResult = rocketMQTemplate.syncSend(topicWithTag, tLogMqWrapBean);
            log.info("====> RocketMQ消息发送 - 发送成功 - MessageId: {}, Topic: {} <====", sendResult.getMsgId(), topicWithTag);
        } catch (MessagingException e) {
            log.error("====> RocketMQ消息发送 - 发送失败 <====", e);
            throw e;
        }
    }

}


  1. 消费者接收消息示例
package com.xiaoniu.motorcade.rocketmq;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.xiaoniu.common.utils.XiaoniuThreadLocal;
import com.xiaoniu.motorcade.kafka.model.bo.ImportData;
import com.xiaoniu.motorcade.kafka.service.AsyncImportService;
import com.xiaoniu.motorcade.kafka.service.ImportContext;
import com.xiaoniu.motorcade.kafka.service.ImportStrategyService;
import com.xiaoniu.motorcade.kafka.utils.ListenerUtils;
import com.xiaoniu.motorcade.util.ChangeClassType;
import com.yomahub.tlog.core.mq.TLogMqConsumerProcessor;
import com.yomahub.tlog.core.mq.TLogMqRunner;
import com.yomahub.tlog.core.mq.TLogMqWrapBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;

import java.util.Optional;

/**
 * @Author Tenshin
 * @Date 2024-08-20 12:46
 * @Version 0.0.1
 * @Description VehiclesStopInfoQueryRocketMqListener 载具历史经停点RocketMQ消息监听器
 */
@Slf4j
@Service
@RocketMQMessageListener(topic = ListenerUtils.RMQ_TOPIC, selectorExpression = "${rocketmq.consumer.selector-expression}", consumerGroup = "${rocketmq.consumer.group}" + StrUtil.DASHED + ListenerUtils.RMQ_TOPIC)
public class VehiclesStopInfoQueryRocketMqListener implements RocketMQListener<String> {

    /**
     * 异步导入service
     */
    @Autowired
    private AsyncImportService asyncImportService;

    /**
     * 导入数据转换
     */
    @Autowired
    private ChangeClassType changeClassType;

    /**
     * 导入类型策略
     */
    @Autowired
    private ImportContext importContext;

    /**
     * 消费消息
     *
     * @param s 消息
     */
    @Override
    public void onMessage(String s) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        log.info("====> 载具历史经停点RocketMQ消息监听器 - 开始消费消息: {} <====", s);
        try {
            TLogMqWrapBean<?> tlog = Optional.ofNullable(s)
                .map(x -> JSON.parseObject(x, TLogMqWrapBean.class))
                .orElse(null);
            if (null == tlog) {
                throw new RuntimeException("消息解析失败");
            }
            TLogMqConsumerProcessor.process(tlog, (TLogMqRunner<String>) x -> {
                try {
                    this.businessConsume(x);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            log.info("====> 载具历史经停点RocketMQ消息监听器 - 消费成功 <====");
        } catch (Exception e) {
            log.error("====> 载具历史经停点RocketMQ消息监听器 - 消费异常 <====", e);
        } finally {
            stopWatch.stop();
            log.info("====> 载具历史经停点RocketMQ消息监听器 - 消费完成, 耗时: {} 毫秒 <====", stopWatch.getLastTaskTimeMillis());
        }
    }

    /**
     * 业务处理
     *
     * @param json 消息体
     * @throws Exception 异常
     */
    private void businessConsume(String json) throws Exception {
        ImportData<?> importData = changeClassType.change(JSON.parseObject(json, ImportData.class));
        ImportStrategyService importService = importContext.getImportService(importData.getDocumentType());
        // 设置sessionInfo
        XiaoniuThreadLocal.set(importData.getSessionInfo());
        asyncImportService.asyncHandler(importData, importService, importData.getDocumentType());
    }

}

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论