
作者 | davidyang(杨尚京)
导读
你是否在使用MQ消费者的时候感叹,一大堆监听代码,编码效率又低,代码又冗长,看完这篇文章,MQ编码效率提升20%!!!!
对于乐信的每个后端开发同学而言,MQ是大家无法忽略的一个组件。
你要解耦的时候,你要使用MQ
你要削峰的时候,你要使用MQ
你要异步的时候,你要使用MQ
公司的MQ方案选择了阿里的RocketMQ,架构的同学在上面做了一层封装。
MQ基本模型

对于每个业务模块而言,都很可能需要使用MQ上述Producer和Consumer里面的的一种角色。
如何接入公司MQ
Producer
Producer的使用比较简单,需要理解的东西也不多。
String message = JSON.toJSONString(input);
mqPushService.pushVirtualTopic(topic, JSONObject.parseObject(message));
对于公司的场景,大部分同学直接引用MqPushService类,调用pushVirtualTopic函数就可以解决问题。
void pushVirtualTopic(String mq, Map<String, Object> body, String... args);
Consumer
然而使用Consumer的时候,情况变得复杂了起来。相对于Producer只需要把消息发出去这一步操作而言。Consumer需要消费一条消息,除了consume这个操作,还需要先做一步subscribe才可以。这个原理上也不难理解,在一个MQ集群里面有非常多的topic,每个topic又有不止一个消费者。如果不区分管理,那么每产生一条消息,都需要向所有的consumer服务广播消息,而每个consumer都需要把每一条消息拉到本地,然后确认自己不需要消费再丢弃。无论对MQ集群还是对consumer,都是不堪重负。所以有了subscribe以后,消息只需要对特定的consumer方投送,而每个consumer也只会接收他已经subscribe的消息,集群和consumer的负担都大大减轻。
下面我们看看subscribe和consumer分别需要做哪些事情。好吧,consumerGroup还稍微能够理解,但是
destinations是个啥?
listener又是啥?
consumeConfig是配置么,我要配置什么?
具体待我们细细分析
/***
* 增加一个消费组名称为consumerGroup的消费者, 并使其订阅指定的消息。
*
* @param consumerGroup 消费组名称
* @param destinations 消费的目标对象。 一个消费者可以订阅多个MqDestination, 但同一个group的各个client所指定的destination列表必须是完全相同的
* @param listener 回调对象, 处理消息的具体逻辑应该被封装在其中
* @param consumeConfig 指定了消息订阅的一些配置参数,具体看MqConsumeConfig类的定义说明。若为null,则表示所有配置项使用默认配置。
* @throws MqConsumeException 订阅消费时可能会发生的一些异常,当发生异常时,通常意味着本次订阅没有生效。 具体有哪些异常请看MqConsumeErrorCode的说明
* @see MqDestination
* @see MqMessageListener
* @see MqConsumeConfigInfo
*/
void subscribe(String consumerGroup, List<MqDestination> destinations, MqMessageListener listener, MqConsumeConfigInfo consumeConfig) throws MqConsumeException;
/***
* 注销掉消费组对消息的订阅
*
* @param consumerGroup 消费组名称
*/
void unsubscribe(String consumerGroup);
MqDestination
确定了两个元素,topic和tags,topic大家都理解,tags主要为了过滤用,平常我们用不到。
public class MqDestination {
/*** 队列名称 */
private String topic;
/*** RocketMQ中用于过滤消息的tags, 为星号*或为null时表示不过滤消息 */
private String tags;
}
MqMessageListener
MqMessageListener声明了我们要消费的函数,其中的参数MqMessage和MqConsumeContext 又是我们需要理解的+_+
public interface MqMessageListener {
/***
* @param msgs 待消费的批量消息, 消息的定义具体看 Message 类的说明
* @param context 消费消息时的上下文环境 具体看 MqConsumeContext 类的说明
* @return MqConsumeStatus 枚举对象, MqConsumeStatus中各个选项的意义具体看 MqConsumeStatus 类的说明。
* @see MqMessage
* @see MqConsumeStatus
* @see MqConsumeContext
*/
MqConsumeStatus consumeMessage(List<MqMessage> msgs, MqConsumeContext context);
}
MqConsumeConfigInfo
MqConsumeConfigInfo包含内容比较多,比如我们从哪个位置开始消费,使用集群消费还是广播消费。内容虽然多,但是大部分情况下大家的选择都是一致的。
绝大多数情况下,我们应该选择在启动时从队列最后位置消费,也会选择集群消息
public class MqConsumeConfigInfo {
/***当消费者所属的group第一次启动时, 指定从哪个位置开始消费, 如果不是第一次消费则该配置项不起作用*/
private MqConsumeFromWhere mqConsumeFromWhere = MqConsumeFromWhere.MQ_CONSUME_FROM_LAST_OFFSET;
private MqConsumeModel mqConsumeModel = MqConsumeModel.CLUSTERING;
/*** 当mqConsumeFromWhere的值为MQ_CONSUME_FROM_TIMESTAMP时才生效,指定一个字符串表示的时间, 从该时间戳处的进度开始消费。 时间的格式为 yyyyMMddHHmmss*/
private String startConsumeTimeStamp;
/*** 消费者组件支持批量方式消费消息,batchMaxSize定义了一次批量处理的消息个数上限, 最大为 1024。 默认值为 1,即默认是一条一条地消费 */
private int batchMaxSize = 1;
private int consumeMinTreadCnt = 20;
private int consumeMaxTreadCnt = 20;
/*** 最大消费线程数, 和 consumeMinTreadCnt 一起决定了单个应用实例消费的并行度, 默认为64 */
/**
* 并行的消费者个数,默认1,如果此值配置大些的话 需要注意JAVA程序启动内存的配置,也要相应的增大
*/
private int ccConsumerNum = 1;
private boolean isConsumeOrderly = false;
}
如何接入lexin_common的MQ
对于lexin_common而言,我们的宗旨是让业务研发人员能够专注于业务研发。Producer端还好,就一个方法,也没有额外东西需要去理解;但Consumer端接入需要理解的信息太多,这是我们所深恶痛绝的。
开发人员对Consumer端各个字段进行了分析
| 字段名 | 字段说明 | 默认值 |
|---|---|---|
| topic | topic名称 | 空 |
| group | 消费组名称 | 空 |
| tag | 消息tag | 空 |
| consumeFromWhere | 消费位点,从哪里开始消费 | MqConsumeFromWhere.MQ_CONSUME_FROM_LAST_OFFSET |
| consumerModel | 消费模式 | MqConsumeModel.CLUSTERING |
| batchMaxSize | 每次拉取消息数量 | 1 |
| consumeMinTreadCnt | 每个消费线程组最小线程数量 | 20 |
| consumeMaxTreadCnt | 每个消费线程组最大线程数量 | 20 |
| consumerNum | 消费线程组数量 | 1 |
| isConsumeOrderly | 是否顺序消费 | false |
分析下来,必不可少的字段只有两个
topic:消费什么消息
group:标识消费者
再加上我们处理消息的方法,这三者已经可以构成我们消费一条消息的基本元素了。基于这个,lexin_common选择了
通过接口定义处理消息的方法
通过注解来声明该方法对应的相关配置
@Topic(name = "LEXIN_COMMON_MQ_TOPIC_ORDER", group = "orderMqHandlerGroup")
public class OrderMqHandlerImpl implements MqHandler<Order> {
@Override
public MqConsumeStatus handle(Order msg) {
}
}
如上面代码代码所示,业务同学只需要实现MqHandler方法来处理消息,然后通过topic注解确定好要消费的topic消息以及本身的group。相对于原本的代码,无论从代码数量,还是从接入简便程度,都有极大的优化。
你需要来试用一下吗?可以参考我们的接入说明哦!
参考
[1] rocketmq文档学习(二):概述,物理部署结构,逻辑部署结构
end







