RocketMQ是阿里巴巴开源的消息与流处理平台,目前已经是apache顶级项目,它具有低延时、高性能、高可靠、万亿级消息储存以及可扩展性灵活等特性,并在企业中应用比较广泛。
RocketMQ主要由4个核心部分组成:name servers, brokers, producers and consumers。

每个组件都可以横向扩展,其中收发消息的关键组件是Broker(分为Master, Slave), 本文将从部署,日常运维管理等方面介绍RocketMQ如何通过Operator以云原生的方式进行日常维护。
官方Operator地址参见: https://github.com/apache/rocketmq-operator。
部署
对于RocketMQ, 默认支持的部署方式为多主多从,可以提供主从节点的高可用保障;从版本1.4.5以后,开始支持RocketMQ-on-DLeger Group模式部署,RocketMQ-on-DLeger Group即在一个group的节点通过Raft协议进行选举主节点,一旦主节点故障,可以通过重新选举实现节点故障自动切换。

RocketMQ operator根据不同组件定义不同的CRD, 定义不同组件的运维以及部署方式。
nameserver
apiVersion: rocketmq.apache.org/v1alpha1
kind: NameService
metadata:
name: name-service
namespace: default
spec:
# size is the the name service instance number of the name service cluster
size: 1
# nameServiceImage is the customized docker image repo of the RocketMQ name service
nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine-operator-0.3.0
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# hostNetwork can be true or false
hostNetwork: true
# Set DNS policy for the pod.
# Defaults to "ClusterFirst".
# Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'.
# DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy.
# To have DNS options set along with hostNetwork, you have to specify DNS policy
# explicitly to 'ClusterFirstWithHostNet'.
dnsPolicy: ClusterFirstWithHostNet
# resources describes the compute resource requirements and limits
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1024Mi"
cpu: "500m"
# storageMode can be EmptyDir, HostPath, StorageClass
storageMode: EmptyDir
# hostPath is the local path to store data
hostPath: /data/rocketmq/nameserver
# volumeClaimTemplates defines the storageClass
volumeClaimTemplates:
- metadata:
name: namesrv-storage
spec:
accessModes:
- ReadWriteOnce
storageClassName: rocketmq-storage
resources:
requests:
storage: 1Gi
可以自定义nameserver节点个数,镜像,资源限制,存储类型等。
broker
apiVersion: v1
kind: ConfigMap
metadata:
name: broker-config
namespace: default
data:
# BROKER_MEM sets the broker JVM, if set to "" then Xms = Xmx = max(min(1/2 ram, 1024MB), min(1/4 ram, 8GB))
BROKER_MEM: " -Xms2g -Xmx2g -Xmn1g "
broker-common.conf: |
# brokerClusterName, brokerName, brokerId are automatically generated by the operator and do not set it manually!!!
deleteWhen=04
fileReservedTime=48
flushDiskType=ASYNC_FLUSH
# set brokerRole to ASYNC_MASTER or SYNC_MASTER. DO NOT set to SLAVE because the replica instance will automatically be set!!!
brokerRole=ASYNC_MASTER
---
apiVersion: rocketmq.apache.org/v1alpha1
kind: Broker
metadata:
# name of broker cluster
name: broker
namespace: default
spec:
# size is the number of the broker cluster, each broker cluster contains a master broker and [replicaPerGroup] replica brokers.
size: 1
# nameServers is the [ip:port] list of name service
nameServers: ""
# replicaPerGroup is the number of each broker cluster
replicaPerGroup: 1
# brokerImage is the customized docker image repo of the RocketMQ broker
brokerImage: apacherocketmq/rocketmq-broker:4.5.0-alpine-operator-0.3.0
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# resources describes the compute resource requirements and limits
resources:
requests:
memory: "2048Mi"
cpu: "250m"
limits:
memory: "12288Mi"
cpu: "500m"
# allowRestart defines whether allow pod restart
allowRestart: true
# storageMode can be EmptyDir, HostPath, StorageClass
storageMode: EmptyDir
# hostPath is the local path to store data
hostPath: /data/rocketmq/broker
# scalePodName is [Broker name]-[broker group number]-master-0
scalePodName: broker-0-master-0
# env defines custom env, e.g. BROKER_MEM
env:
- name: BROKER_MEM
valueFrom:
configMapKeyRef:
name: broker-config
key: BROKER_MEM
# volumes defines the broker.conf
volumes:
- name: broker-config
configMap:
name: broker-config
items:
- key: broker-common.conf
path: broker-common.conf
# volumeClaimTemplates defines the storageClass
volumeClaimTemplates:
- metadata:
name: broker-storage
spec:
accessModes:
- ReadWriteOnce
storageClassName: rocketmq-storage
resources:
requests:
storage: 8Gi
可以定义broker的配置,以及主从集群数目,资源限制,存储类型等。
rocketmq console
rocketmq console是RocketMQ的管理控制台,可以通过console查看RocketMQ的消息收发情况。
apiVersion: rocketmq.apache.org/v1alpha1
kind: Console
metadata:
name: console
namespace: default
spec:
# nameServers is the [ip:port] list of name service
nameServers: ""
# consoleDeployment define the console deployment
consoleDeployment:
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: rocketmq-console
spec:
replicas: 1
selector:
matchLabels:
app: rocketmq-console
template:
metadata:
labels:
app: rocketmq-console
spec:
containers:
- name: console
image: apacherocketmq/rocketmq-console:2.0.0
ports:
- containerPort: 8080
利用Kubernetes的List-watch机制,通过CRD的资源配置修改,触发RocketMQ Operator做对应操作,包括组件(statefulset, deployment), service, configmap的创建,以及具体部署逻辑的修改,通过声明式API实现该功能。
开源的RocketMQ operator只支持主从的部署模式,但是RocketMQ-on-DLeger Group部署模式提供了更高的高可用能力,于是在开源的基础上增加了RocketMQ-on-DLeger Group模式的部署,支持部署模式的切换,部署过程中会自动解析同一个Group的不同节点的IP地址进行环境变量的注入, 减少人工维护成本。
在RocketMQ-on-DLeger Group部署模式,参数dLegerPeers是可以配置成服务名称的,但因为RocketMQ 4.5.0本身存在一个bug: https://github.com/apache/rocketmq/issues/2384, 虽然在4.8.0已经修复了该问题,但业务方都是使用了这个版本的RocketMQ, 所以通过解析服务地址进行绕过了这个问题。
具体实现感兴趣可以参考: https://github.com/apache/rocketmq-operator/pull/87。
运维操作
水平扩缩容
通过RocketMQ Operator可以很方便的对NameServer, Broker集群进行水平扩缩容。
NameServer扩缩容
通过RocketMQ Operator只需要修改NameServer CRD里面size就可以对NameServer集群节点进行水平扩缩容,新的NameServer扩容出来之后,operator会自动通知所有的Broker更新NameServer 列表,使得Broker注册服务到新的NameServer节点。
Broker集群扩缩容
一个Broker集群一般是包括多个节点,可以修改Broker CRD的size值针对集群进行扩缩容;通过修改Broker CRD中的scalePodName可以指定扩容的新集群从哪个节点上同步元数据(包括Topic和订阅信息);也可以通过修改replicaPerGroup对Broker集群的从节点进行扩缩容。
Topic迁移
Topic 迁移是指用户希望将一个Topic的服务工作从一个源集群转移到另一个目标集群,并且在这个过程中不影响业务。这可能发生在用户想要停用源集群或减轻源集群的工作负载压力。通常Topic 迁移的过程分为以下7步:
添加要转移的Topic的所有消费者组到目标集群。
添加要转移的Topic到目标集群。
业务管理
通过RocketMQ Operator安装的RocketMQ Console对MQ进行日常的业务管理,包括Topic创建,消息轨迹查询,重置消费位点,生产者,消费者,消息体的查询等操作。
小结
通过RocketMQ Operator对RocketMQ的日常运维能力进行一层封装,可以大大减少日常部署以及运维的时间,提高了运维人员的生产力。另外对其他服务的运维也有一定的借鉴能力,通过Operator的开发模式,可以对服务运维能力进一步封装,提高运维效率。