
过滤示例
在大多数情况下,TAG是一个简单而有用的设计来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。
原理
SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些有趣的逻辑。下面是一个例子:
------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------
语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以轻松但扩展它。
- 数字比较,像
>
,>
,<
,<=
,BETWEEN
,=; - 字符比较,类似
=
,<>
,IN
; IS NULL
或IS NOT NULL
;- 逻辑的
AND
,OR
,NOT
;
常量类型:
- 数字, 如123, 3.1415;
- 字符, 如‘abc’, 必须用单引号;
NULL
, 特殊常量;- Boolean,
TRUE
orFALSE
;
使用限制
只有推送用户才能使用SQL92来选择消息。接口为:
public void subscribe(final String topic, final MessageSelector messageSelector)
消费者示例
你可以在发送时通过方法putUserProperty
把属性放入消息中。
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );// Set some properties.msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown();
消费者示例
消费时使用MessageSelector.bySql
通过SQL92选择消息。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
文章转载自evilRat,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




