点击蓝字
关注我们
暑期2021项目研发正在火热进行中,开源之夏公众号面向广大社区及项目承担学生征稿,欢迎大家热情分享:
发送投稿文章至官方联络邮箱:
summer@iscas.ac.cn
添加公众号小编微信投稿:

本期分享来自 Apache RocketMQ 社区的周全同学(Dapr集成RocketMQ)的项目经验。
项目名称
实现 RocketMQ 与 Dapr 的集成
项目综述
• Dapr 是一个可移植的、事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的、无状态和有状态的应用程序,并可运行在云平台或边缘计算中,它同时也支持多种编程语言和开发框架。Dapr将构建微服务应用的最佳实践设计成开放、独立和模块化的方式。
• RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
• 本次项目的需求是为Dapr提供RocketMQ的集成,实现PubSub模块。
项目步骤
• 明确项目要求并理解意义
• 学习各模块相应知识,理解整体框架
• 依据方案,解决问题
项目实施
理解项目要求,理解整体框架
• Dapr:分布式应用运行时,将开发过程中与业务逻辑无关的分布式能力拆分出来,作为独立的runtime,让系统解耦。本次项目针对rocketmq,目的在于实现分布式消息订阅发布能力
• Dapr通过对分布式能力的抽象和归类,总结出以下几大类:

• RocketMQ:分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务,在架构上主要分为四部分:

• Dapr PubSub模块

参考资料及开源实现,完成调研
• 对于PubSub模块,Dapr已经抽象出PubSub接口供各组件去实现功能,我们首先参考社区内已完成集成的组件,例如kafka,pulsar,rabbitmq
• 在Dapr社区内已有基于第三方客户端的RocketMQ集成,我们首先参考开源的实现,理解其设计思路并总结出需要改善的点
• 项目结构:
rocketmq.go 作为实现功能的主要文件,settings.go 主要作为Dapr规定的MetaData存放配置信息

• Settings结构体用来存放配置信息,与用户定义的components\pubsub.yaml配置文件中的metadata相对应

• 初始化组件Init()
通过解析入参metadata,对rocketmq的produer,consumer等成员进行初始化
• 发布消息Publish()
根据Dapr指定的接口入参*pubsub.PublishRequest 解析出topic, data信息,再配合metadata中的字段,为消息message提供tag,key等附加信息,调用rocketmq-client-go暴露的API实现发送功能
• 订阅主题Subscribe()
PubSub接口的订阅功能提供两个入参:pubsub.SubscribeRequest, pubsub.Handler
在rocketmq的客户端中可以看到Subscribe函数签名:
Subscribe(topic string, selector consumer.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) error
项目实施,解决问题
• 首先是定义metadata结构,这里自定义资源定义(CRD)文件相对应,例如官方示例
apiVersion: dapr.io/v1alpha1kind: Componentmetadata:name: pubsubnamespace: defaultspec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: localhost:6379- name: redisPasswordvalue: ""
本项目中结合RocketMQ特性,约定了如下结构

• 初始化rocketMQ组件,解析配置文件到metadata字段

• 根据配置参数,对producer进行初始化

• 根据配置参数,对consumer进行初始化

• Publish接口部分实现

• Subscribe接口部分实现

项目测试
由于项目仍在编写过程中,本次测试以官网的示例,待开发完毕后对RocketMQ组件进行测试
以Windows环境为例:
下载并安装 Dapr CLI
powershell -Command "iwr -useb https://raw.githubusercontent.com/dapr/cli/master/install/install.ps1 | iex"
验证Dapr安装
dapr
显示如图所示,说明安装成功

初始化Dapr
dapr init
安装最新的 Dapr 运行时二进制程序

验证容器运行状态
docker ps
确保镜像为daprio/dapr, openzipkin/zipkin和 redis的容器都在运行

验证组件目录初始化
打开%USERPROFILE%\.dapr\ 文件夹

运行Dapr sidecar
运行以下命令以启动 Dapr sidecar,它将在端口 3500 上监听名为 myapp 的空白应用程序
https://internal-api-drive-stream.feishu.cn/space/api/box/stream/download/all/boxcnujGgIz4Us9XPM3ui753LCg/?mount_node_token=doccn4PgbLRoGzONL4FfRTETSkb&mount_point=doc_image
保存状态
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "name", "value": "Bruce Wayne"}]' -Uri 'http://localhost:3500/v1.0/state/statestore'
获取状态
Invoke-RestMethod -Uri 'http://localhost:3500/v1.0/state/statestore/name'
根据指定的key:name,获取到value:Bruce Wayne

项目规划
• 在当前版本的基础上进一步完善功能,丰富单元测试
• 定义一个rocketmq组件,本地运行Dapr sidecar进行PubSub测试
• 完成 RocketMQ binding的实现
本文为周全同学原创文章,欢迎大家投稿





