暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

云服务推送API中消息中间件的使用

Sumslack团队 2018-09-13
330

在中大型项目开发过程中,我们往往会使用消息中间件来对各类消息进行路由与管理,已达到模块解耦的目的,降低项目复杂性,阿里巴巴在ActiveMQ的基础上,开源了RocketMQ,提供了更符合实际项目需要的各类功能包括:包括同步消息,异步消息,单向传输消息,顺序消费,定时消息,分布式事务消息等,阿里用它处理着每天几千亿的消息路由,用在包括订单,交易,充值,流计算,消息推送,binlog分发等应用场景。

概述

RocketMQ
是一个分布式消息中间件,并支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等,架构图如下:

Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上; Broker具体处理消息的存储和服务;生产者和消费者是消息的源头和归宿。

性能上,RocketMQ
单机TPS约7w/s,部署3个Broker,可以最高跑到12w/s,消息大小10个字节,RocketMQ单机最高支持5万个队列,Load不会发生明显变化,RocketMQ
采用长轮询,同Push实时性一致,消息投递延迟在几个毫秒级别,消息失败支持定时重试,每次重试间隔时间顺延递增,失败重试的消息不会因为consumer宕机而丢失;


相比其他消息中间件,RocketMQ带来以下特性:


  1. 消息查询:指定Message Key或Message Id查询消息;

  2. 分布式事务支持;

  3. 消息回溯:支持从某个时刻重新消费消息;

  4. 定时消息;

  5. 消息顺序:支持FIFO的顺序消息消费,一台Broker宕机后,发送消息失败,不会引起乱序;

  6. 消息重试;

  7. 消息实时投递;

  8. 消息轨迹;

  9. MQ消息监控控制台:rocketmq-console

  10. 支持丰富的消息过滤机制:通过Tags过滤,通过Java Class做任意形式的过滤,也可以做Message Body的过滤拆分;

  11. 消息堆积处理:支持亿级消息堆积;

  12. 高效的订阅者水平扩展能力,丰富的拉取消息模式;


安装

安装环境
  • 64bit OS, Linux/Unix/Mac is recommended;

  • 64bit JDK 1.8+;

  • Maven 3.2.x

  • Git

源码编译
1> unzip rocketmq-all-4.3.0-source-release.zip
2> cd rocketmq-all-4.3.0/
3> mvn -Prelease-all -DskipTests clean install -U
4> cd distribution/target/apache-rocketmq

启停服务
 1> nohup sh bin/mqnamesrv &
2> tail -f ~/logs/rocketmqlogs/namesrv.log
3The Name Server boot success...
4> nohup sh bin/mqbroker -n localhost:9876 &
5> tail -f ~/logs/rocketmqlogs/broker.log 
6The broker[%s, 172.30.30.233:10911] boot success...
7// 关闭服务
8> sh bin/mqshutdown broker
9The mqbroker(36695) is running...
10Send shutdown request to mqbroker(36695) OK
11> sh bin/mqshutdown namesrv
12The mqnamesrv(36664) is running...
13Send shutdown request to mqnamesrv(36664) OK

命令行发送消费消息
1 > export NAMESRV_ADDR=localhost:9876
2 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
3 SendResult [sendStatus=SEND_OK, msgId= ...
4 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
5 ConsumeMessageThread_%d Receive New Messages: [MessageExt...


消息队列控制台

除了消息队列控制台,其他更多的RocketMQ
扩展可以从https://github.com/apache/rocketmq-externals中找到

运维
  • 你可以修改这个服务使用的navesvr的地址

  • 你可以修改这个服务是否使用VIPChannel(如果你的mq server版本小于3.5.8,请设置不使用)

驾驶舱
  • 查看broker的消息量(总量/5分钟图)

  • 查看单一主题的消息量(总量/趋势图)

集群页面
  • 查看集群的分布情况:cluster与broker关系、broker

  • 查看broker具体信息/运行信息

  • 查看broker配置信息

Topic

提供了Topic管理,筛选功能,Topic状态管理,路由信息,发送消息等功能;

Consumer

提供了消费者管理,筛选功能

发布管理

消息查询

根据Topic和时区查询,最多显示2000条;根据Topic和Key搜索;根据消息主题和消息ID查询等;


开发与使用

先定义生产者和消费者两个普通类,以此展开:

  1public class RocketProducer {
 2    private DefaultMQProducer producer = null;
 3    private String groupName;
 4    public RocketProducer(String host,String groupName{
 5        this.groupName = groupName;
 6        producer = new DefaultMQProducer(groupName);
 7        producer.setNamesrvAddr(host);
 8    }
 9    public void start({
10        try {
11            producer.start();
12        } catch (MQClientException e) {
13            e.printStackTrace();
14        }
15    }
16    public void startAsync({
17        try {
18            producer.start();
19            producer.setRetryTimesWhenSendAsyncFailed(0);
20        } catch (MQClientException e) {
21            e.printStackTrace();
22        }
23    }
24    public void shutdown({
25        producer.shutdown();
26    }
27    public SendResult sendMessage(String topic,Object message{
28        return sendMessage(topic,"",message);
29    }
30    public SendResult sendMessage(String topic,String tags,Object message{
31        try {
32            Message msg = new Message(topic,
33                tags,
34                (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET)
35            );
36            SendResult result = producer.send(msg);
37            return result;
38        }catch(Exception ex) {
39            ex.printStackTrace();
40        }
41        return null;
42    }
43
44    public void sendMessageAsync(String topic,String tags,String order,Object message,SendCallback callback{
45        try {
46            Message msg = new Message(topic,
47                tags,order,
48                (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET));
49            producer.send(msg, callback);
50        }catch(Exception ex) {
51            ex.printStackTrace();
52        }
53    }
54
55    public void sendMessageAsync(String topic,String tags,Object message,SendCallback callback{
56        try {
57            Message msg = new Message(topic,
58                tags,
59                (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET));
60            producer.send(msg, callback);
61        }catch(Exception ex) {
62            ex.printStackTrace();
63        }
64    }
65
66    public void sendMessageAsync(String topic,Object message,SendCallback callback{
67        sendMessageAsync(topic,"",message,callback);
68    }
69
70    public void sendMessageOneway(String topic,String tags,Object message{
71        try {
72            Message msg = new Message(topic,
73                tags,
74                (JSON.toJSONString(message)).getBytes(RemotingHelper.DEFAULT_CHARSET));
75            producer.sendOneway(msg);
76        }catch(Exception ex) {
77            ex.printStackTrace();
78        }
79    }
80}
81public class RocketConsumer {
82    private String groupName;
83    private DefaultMQPushConsumer consumer;
84    public RocketConsumer(String host,String groupName{
85        this.groupName = groupName;
86        consumer = new DefaultMQPushConsumer(groupName);
87        consumer.setNamesrvAddr(host);
88    }
89
90    public void subscribe(String topicName,MessageListenerConcurrently msgListener{
91        try {
92            consumer.subscribe(topicName, "*");
93            consumer.registerMessageListener(msgListener);
94            consumer.start();
95        }catch(Exception ex) {
96            ex.printStackTrace();
97        }
98    }
99
100    public void broadcast(String topicName,MessageListenerConcurrently msgListener{
101        try {
102            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
103            consumer.setMessageModel(MessageModel.BROADCASTING);
104            consumer.subscribe(topicName, "*");
105            consumer.registerMessageListener(msgListener);
106            consumer.start();
107        }catch(Exception ex) {
108            ex.printStackTrace();
109        }
110    }
111}


  • 消费消息(含tags过滤与广播)

如果消息消费不成功,可以设置RECONSUME_LATER
,如果消息有顺序,需设置Key;


  • 消费消息:复杂过滤条件



  • log4j直接输出到RocketMQ也是可以的,只需配置log4j配置文件即可;

  • 分布式事务控制:必须实现TransactionListener
    ,建立分布式事务最终一致性;


云平台应用场景
推送消息有序,并且消息不丢失

典型应用场景是数据变化通知,云平台为全公司提供基础数据变动推送服务,通过canal
侦听基础库的数据变化,将哪个表哪条记录发生变化通过到第三方URL,这些消息必须严格保证有序,否则就会引起数据严重的错乱问题,比如原推送是先删除A表记录,再删除A表,如果推送给用户收到的消息是先删除A表,再删除A表记录,那么就会引起问题,所以这里推送给第三URL的消息必须保证严格的顺序,并且消息不允许丢失,如下我们的Demo程序中,推送的10条消息到客户端是完全有序的,并且消息不丢失,即便中间第三方URL中断无法访问也不会丢失:

推送消息无序,并且消息不丢失

比如行情数据的推送,自带时间点对应哪个数据点,至于A时刻A1值先到,还是B时刻B1值先到,都不影响最终数据结果,所以这类应用最为常见,也是我们默认的推送方式,在我们的Demo程序中,消息乱序但消息不丢失,我们发送的10条消息客户端Websocket收到可能顺序是这样的:

推送消息丢失

发送一个信号即便当前URL不可用也是可忽略的推送消息,我们使用该场景,如下云平台推送了10条消息,但在客户端只收到了2条,因为期间第三方URL有中断或返回失败值的情况:

感谢阅读!


你可以继续阅读:对微服务的理解以及实现一套微服务对外发布API管理平台 | 项目开发中常用的设计模式整理 | 异构语言调用平台的设计与实现 | 大话正则表达式 | 云API平台的设计与实现 | 个税改了,工资少了,不要慌!文末附计算器



关注我们的公众号 

长按识别二维码关注我们


文章转载自Sumslack团队,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论