暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache RocketMQ在海尔智家的应用实践

RocketMQ官微 2019-11-11
1121

本文整理自海尔的姚翔在2019年10月26日Apache RocketMQ开发者沙龙西安站的分享。

本文的主要内容包括以下几个方面:

  1. MQ的选型

  2. MQ使用

  3. 海尔智能生产实践

1. MQ的选型


1.1 MQ作用

选型方面是先看不同MQ的功能点,然后根据企业的具体业务场景去选。如图,削峰填谷、异步、解耦是比较常用的,其他的还有数据流转、微服务、IOT等等。

1.2 基本特性

MQ有很多,这里主要根据可靠性,选择3种比较有代表性的进行分析:RabbitMQ、Kafka、RocketMQ。主要的特性对比如图所示。

从图中可以看出,RocktMQ相对RabbitMQ,特性更多,体量和性能也更好。后面再针对Kafka和RocketMQ进行一些对比。

1.4 考量

(1)高可靠

RocketMQ:支持 同步 刷盘、异步刷盘。
Kafka:支持异步刷盘,通过修改异步刷盘的频率,每条消息都刷一次磁盘,但是频繁的磁盘读写直接导致性能的下降。

(2)高性能、低延迟

rocketmq 和 kafka :顺序写入和零拷贝
kafka:随着 topic 数量增加,消息分散落盘策略会导致磁盘 IO 竞争激烈成为瓶颈(随机写的可能性增大,分区之间刷盘的冲撞率高),稳定性和性能会急剧下降。

rocketmq所有的消息都是写到同一个commitlog文件,kafka是每一个分区(partition)都对应一个文件。

rocketmq存储结构:

kafka存储:

(3)消息特性

rocketmq:同步、异步、批量(同步)、顺序、事务、定时,回溯。

因为业务需求,我们可能需要支持顺序、事务。rocketmq是目前支持比较丰富的。

(4)稳定及社区

rocketmq:过阿里双十一的洗礼,万亿级堆积能力及千万级的TPS,99.996% 的延迟在 10ms 以内,社区非常活跃。

2. MQ的使用


我们使用的时候,还没有对应的rocketmq-spring-boot-starter,所以我们针对spring,在rocketmq原生的java sdk上进行了自己的封装。我们这边主要还是jdk1.7,之前rocketmq server端是支持jdk 1.8及以上的。另外有看到社区也贡献了对应的spring-boot-starter。

2.1 原生二次封装

如图,生产者的封装比较简单,主要有初始化(init-method)和销毁(destroy-method),还有对应的producerGroupName和nameServerAddr属性。

消费者主要有两个维度,一个也是一些properties属性设置,另外是一些订阅topic的信息subscriptionTable。

如图是展开的subscriptionTable,订阅了4个topic,然后有对应的topic、tag属性。

由于原生SDK中实例是基于clientId获取,而clientId 是由clientIp@instanceName@unitName三部分组成, 所以我们重写了instanceName以达到一个消费端可以启动多个实例。

2.2 Spring boot

如图是spring message的一个模型。消息体有header、payload,然后producer、consumer之间有一个消息通道。

对于生产者,这里 RocketMQTemplate 继承了抽象类AbstractMessageSendingTemplate,并重写抽象方法doSend,doSend里是调用SyncSend,另外RocketMQTemplate 也提供了 asyncSend,asyncSendOrderly等方法,这些方法再底层是调用的是原生的 rocketmq 的send方法。

对于消费者,DefaultRocketMQListenerContainer 为默认的容器,默认使用DefaultMQPushConsumer。

3. 海尔智家生产实践


3.1 业务场景

例如用户登录之后,会进行一些用户等级、积分的计算,不同系统之间进行一个解耦。

一些运营活动,可能需要做到一些异步化处理,比如抽奖之后,积分结果可能不需要实时看到。

如图是IOT方面的一个场景,设备绑定之后,需要在微信小程序里看到对应的状态,这里使用websocket进行一个实时的双向通信。

kafka消息平台为数据源,有大量的设备绑定和解绑消息。然后redis会维护缓存用户和设备的关系。

kafka的消息量比较大,单节点消费比较慢,所以我们用多个broker,多节点消费。经过一些过滤之后,发给rocketmq,然后再转发给keepalive模块消费。

keepalive中有保存用户和session的对应关系。

3.2 实践

如图是运维方面的一些实践。

只配置一个brokerIp1,来避免多网卡问题。

topic自动创建方面设置为false,避免一些问题。

线上部署的时候,预留足够大的pagecache来保证稳定性。

业务上,对于keys必须设置,比如可以选用用户id等等,便于出现问题时消息的查找。

消费端的问题,消费重复的问题,由我们自己去做消息的幂等性,这里我们用db来做,对性能要求高的可以用redis来做。

消费慢的问题,随着量的增加,我们做了一些优化,如提高并行度、批量消费,对于一些不重要的场景,比如耗时比较长的业务,可能会一直超时,最后放到了死信队列,这里我们可以直接异步化,消息发送后直接返回。

作者介绍:

姚翔:开源爱好者,先后参与中东运营商系统、电信财务共享、物联网系统的设计及 研发,主要关注分布式系统:微服务、消息中间件等。目前在海尔智家主要负责系统的 MQ 及微服务的落地及演进相关的技术平台。

徐少鹏:中科院软件所在读研究生,对分布式、中间件比较感兴趣,积极参与开源。

文章转载自RocketMQ官微,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论