暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

乐信基础框架系列--MQ

乐信技术精英社 2020-07-16
616

作者 | davidyang(杨尚京)

导读

你是否在使用MQ消费者的时候感叹,一大堆监听代码,编码效率又低,代码又冗长,看完这篇文章,MQ编码效率提升20%!!!!

对于乐信的每个后端开发同学而言,MQ是大家无法忽略的一个组件。

  • 你要解耦的时候,你要使用MQ

  • 你要削峰的时候,你要使用MQ

  • 你要异步的时候,你要使用MQ

公司的MQ方案选择了阿里的RocketMQ,架构的同学在上面做了一层封装。

MQ基本模型

对于每个业务模块而言,都很可能需要使用MQ上述Producer和Consumer里面的的一种角色。

如何接入公司MQ

Producer

Producer的使用比较简单,需要理解的东西也不多。

	String message = JSON.toJSONString(input);

mqPushService.pushVirtualTopic(topic, JSONObject.parseObject(message));

对于公司的场景,大部分同学直接引用MqPushService类,调用pushVirtualTopic函数就可以解决问题。

    void pushVirtualTopic(String mq, Map<String, Object> body, String... args);


Consumer

然而使用Consumer的时候,情况变得复杂了起来。相对于Producer只需要把消息发出去这一步操作而言。Consumer需要消费一条消息,除了consume这个操作,还需要先做一步subscribe才可以。这个原理上也不难理解,在一个MQ集群里面有非常多的topic,每个topic又有不止一个消费者。如果不区分管理,那么每产生一条消息,都需要向所有的consumer服务广播消息,而每个consumer都需要把每一条消息拉到本地,然后确认自己不需要消费再丢弃。无论对MQ集群还是对consumer,都是不堪重负。所以有了subscribe以后,消息只需要对特定的consumer方投送,而每个consumer也只会接收他已经subscribe的消息,集群和consumer的负担都大大减轻。

下面我们看看subscribe和consumer分别需要做哪些事情。好吧,consumerGroup还稍微能够理解,但是

  • destinations是个啥?

  • listener又是啥?

  • consumeConfig是配置么,我要配置什么?

具体待我们细细分析

    /***
* 增加一个消费组名称为consumerGroup的消费者, 并使其订阅指定的消息。
*
* @param consumerGroup 消费组名称
* @param destinations 消费的目标对象。 一个消费者可以订阅多个MqDestination 但同一个group的各个client所指定的destination列表必须是完全相同的
* @param listener 回调对象, 处理消息的具体逻辑应该被封装在其中
* @param consumeConfig 指定了消息订阅的一些配置参数,具体看MqConsumeConfig类的定义说明。若为null,则表示所有配置项使用默认配置。
* @throws MqConsumeException 订阅消费时可能会发生的一些异常,当发生异常时,通常意味着本次订阅没有生效。 具体有哪些异常请看MqConsumeErrorCode的说明
* @see MqDestination
* @see MqMessageListener
* @see MqConsumeConfigInfo
*/

void subscribe(String consumerGroup, List<MqDestination> destinations, MqMessageListener listener, MqConsumeConfigInfo consumeConfig) throws MqConsumeException;

/***
* 注销掉消费组对消息的订阅
*
* @param consumerGroup 消费组名称
*/

void unsubscribe(String consumerGroup);


MqDestination

确定了两个元素,topic和tags,topic大家都理解,tags主要为了过滤用,平常我们用不到。

public class MqDestination {

/*** 队列名称 */
private String topic;

/*** RocketMQ中用于过滤消息的tags, 为星号*或为null时表示不过滤消息 */
private String tags;
}

MqMessageListener

MqMessageListener声明了我们要消费的函数,其中的参数MqMessage和MqConsumeContext 又是我们需要理解的+_+

public interface MqMessageListener {
/***
* @param msgs 待消费的批量消息, 消息的定义具体看 Message 类的说明
* @param context 消费消息时的上下文环境 具体看 MqConsumeContext 类的说明
* @return MqConsumeStatus 枚举对象, MqConsumeStatus中各个选项的意义具体看 MqConsumeStatus 类的说明。
* @see MqMessage
* @see MqConsumeStatus
* @see MqConsumeContext
*/

MqConsumeStatus consumeMessage(List<MqMessage> msgs, MqConsumeContext context);
}


MqConsumeConfigInfo

MqConsumeConfigInfo包含内容比较多,比如我们从哪个位置开始消费,使用集群消费还是广播消费。内容虽然多,但是大部分情况下大家的选择都是一致的。

绝大多数情况下,我们应该选择在启动时从队列最后位置消费,也会选择集群消息

public class MqConsumeConfigInfo {
/***当消费者所属的group第一次启动时, 指定从哪个位置开始消费, 如果不是第一次消费则该配置项不起作用*/
private MqConsumeFromWhere mqConsumeFromWhere = MqConsumeFromWhere.MQ_CONSUME_FROM_LAST_OFFSET;

private MqConsumeModel mqConsumeModel = MqConsumeModel.CLUSTERING;

/*** mqConsumeFromWhere的值为MQ_CONSUME_FROM_TIMESTAMP时才生效,指定一个字符串表示的时间, 从该时间戳处的进度开始消费。 时间的格式为 yyyyMMddHHmmss*/
private String startConsumeTimeStamp;

/*** 消费者组件支持批量方式消费消息,batchMaxSize定义了一次批量处理的消息个数上限, 最大为 1024 默认值为 1,即默认是一条一条地消费 */
private int batchMaxSize = 1;

private int consumeMinTreadCnt = 20;
private int consumeMaxTreadCnt = 20;

/*** 最大消费线程数, consumeMinTreadCnt 一起决定了单个应用实例消费的并行度, 默认为64 */

/**
* 并行的消费者个数,默认1,如果此值配置大些的话 需要注意JAVA程序启动内存的配置,也要相应的增大
*/

private int ccConsumerNum = 1;

private boolean isConsumeOrderly = false;
}

如何接入lexin_common的MQ

对于lexin_common而言,我们的宗旨是让业务研发人员能够专注于业务研发。Producer端还好,就一个方法,也没有额外东西需要去理解;但Consumer端接入需要理解的信息太多,这是我们所深恶痛绝的。

开发人员对Consumer端各个字段进行了分析

字段名字段说明默认值
topictopic名称
group消费组名称
tag消息tag
consumeFromWhere消费位点,从哪里开始消费MqConsumeFromWhere.MQ_CONSUME_FROM_LAST_OFFSET
consumerModel消费模式MqConsumeModel.CLUSTERING
batchMaxSize每次拉取消息数量1
consumeMinTreadCnt每个消费线程组最小线程数量20
consumeMaxTreadCnt每个消费线程组最大线程数量20
consumerNum消费线程组数量1
isConsumeOrderly是否顺序消费false

分析下来,必不可少的字段只有两个

  • topic:消费什么消息

  • group:标识消费者

再加上我们处理消息的方法,这三者已经可以构成我们消费一条消息的基本元素了。基于这个,lexin_common选择了

  • 通过接口定义处理消息的方法

  • 通过注解来声明该方法对应的相关配置

@Topic(name = "LEXIN_COMMON_MQ_TOPIC_ORDER", group = "orderMqHandlerGroup")
public class OrderMqHandlerImpl implements MqHandler<Order> {

@Override
public MqConsumeStatus handle(Order msg) {
}
}

如上面代码代码所示,业务同学只需要实现MqHandler方法来处理消息,然后通过topic注解确定好要消费的topic消息以及本身的group。相对于原本的代码,无论从代码数量,还是从接入简便程度,都有极大的优化。

你需要来试用一下吗?可以参考我们的接入说明哦!

参考

  • [1] rocketmq文档学习(二):概述,物理部署结构,逻辑部署结构

end




在看点这里
文章转载自乐信技术精英社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论