暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

项目经验分享:Dapr集成RocketMQ

开源之夏 2021-08-04
1862

点击蓝字

关注我们


暑期2021项目研发正在火热进行中,开源之夏公众号面向广大社区及项目承担学生征稿,欢迎大家热情分享:

发送投稿文章至官方联络邮箱:

summer@iscas.ac.cn

添加公众号小编微信投稿:


本期分享来自 Apache RocketMQ 社区的周全同学(Dapr集成RocketMQ)的项目经验。



项目名称

实现 RocketMQ 与 Dapr 的集成


项目综述

• Dapr 是一个可移植的、事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的、无状态和有状态的应用程序,并可运行在云平台或边缘计算中,它同时也支持多种编程语言和开发框架。Dapr将构建微服务应用的最佳实践设计成开放、独立和模块化的方式。

• RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。

• 本次项目的需求是为Dapr提供RocketMQ的集成,实现PubSub模块。

项目步骤

• 明确项目要求并理解意义

• 学习各模块相应知识,理解整体框架

• 依据方案,解决问题

项目实施

1

理解项目要求,理解整体框架

• Dapr:分布式应用运行时,将开发过程中与业务逻辑无关的分布式能力拆分出来,作为独立的runtime,让系统解耦。本次项目针对rocketmq,目的在于实现分布式消息订阅发布能力

• Dapr通过对分布式能力的抽象和归类,总结出以下几大类:

• RocketMQ:分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务,在架构上主要分为四部分:

Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费
NameServer:Topic路由注册中心,支持Broker的动态注册与发现,主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。提供心跳检测机制,检查Broker是否还存活
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证

 Dapr PubSub模块

Dapr Pub/Sub 构建块提供一个平台不可知(platform-agnositc)的 API 来发送和接收消息。服务发布消息到一个命名主题(named topic),并且也订阅一个 topic 来消费消息。
服务让网络调用 Dapr 的 Pub/Sub 构建块,暴露为一个 sidecar。然后,这个构建块调用封装了一个特定的 message broker 产品的 Dapr Pub/Sub 组件。要接收 Topics,Dapr 代表服务订阅 Dapr Pub/Sub 组件,并在消息到达时将其发送到端点

2

参考资料及开源实现,完成调研

• 对于PubSub模块,Dapr已经抽象出PubSub接口供各组件去实现功能,我们首先参考社区内已完成集成的组件,例如kafka,pulsar,rabbitmq

• 在Dapr社区内已有基于第三方客户端的RocketMQ集成,我们首先参考开源的实现,理解其设计思路并总结出需要改善的点

• 项目结构:

rocketmq.go 作为实现功能的主要文件,settings.go 主要作为Dapr规定的MetaData存放配置信息

• Settings结构体用来存放配置信息,与用户定义的components\pubsub.yaml配置文件中的metadata相对应

• 初始化组件Init()

通过解析入参metadata,对rocketmq的produer,consumer等成员进行初始化

• 发布消息Publish()

根据Dapr指定的接口入参*pubsub.PublishRequest 解析出topic, data信息,再配合metadata中的字段,为消息message提供tag,key等附加信息,调用rocketmq-client-go暴露的API实现发送功能

• 订阅主题Subscribe()

PubSub接口的订阅功能提供两个入参:pubsub.SubscribeRequest, pubsub.Handler

在rocketmq的客户端中可以看到Subscribe函数签名:

Subscribe(topic string, selector consumer.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) error
topic是订阅的话题,由入参request可得到
Selector是对订阅消息的过滤机制
F 是对消息的处理函数,对入参handler进行封装返回rocketmq规定的处理函数

3

项目实施,解决问题

• 首先是定义metadata结构,这里自定义资源定义(CRD)文件相对应,例如官方示例

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

本项目中结合RocketMQ特性,约定了如下结构

• 初始化rocketMQ组件,解析配置文件到metadata字段

• 根据配置参数,对producer进行初始化

• 根据配置参数,对consumer进行初始化

• Publish接口部分实现

• Subscribe接口部分实现

项目测试

由于项目仍在编写过程中,本次测试以官网的示例,待开发完毕后对RocketMQ组件进行测试

以Windows环境为例:

1

下载并安装 Dapr CLI

powershell -Command "iwr -useb https://raw.githubusercontent.com/dapr/cli/master/install/install.ps1 | iex"


2

验证Dapr安装

dapr

显示如图所示,说明安装成功

3

初始化Dapr

dapr init

安装最新的 Dapr 运行时二进制程序

4

验证容器运行状态

docker ps

确保镜像为daprio/dapr, openzipkin/zipkinredis的容器都在运行

5

验证组件目录初始化

打开%USERPROFILE%\.dapr\ 文件夹

6

运行Dapr sidecar

运行以下命令以启动 Dapr sidecar,它将在端口 3500 上监听名为 myapp 的空白应用程序

https://internal-api-drive-stream.feishu.cn/space/api/box/stream/download/all/boxcnujGgIz4Us9XPM3ui753LCg/?mount_node_token=doccn4PgbLRoGzONL4FfRTETSkb&mount_point=doc_image

7

保存状态

Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "name", "value": "Bruce Wayne"}]' -Uri 'http://localhost:3500/v1.0/state/statestore'

8

获取状态

Invoke-RestMethod -Uri 'http://localhost:3500/v1.0/state/statestore/name'

根据指定的key:name,获取到value:Bruce Wayne

项目规划

• 在当前版本的基础上进一步完善功能,丰富单元测试

• 定义一个rocketmq组件,本地运行Dapr sidecar进行PubSub测试

• 完成 RocketMQ binding的实现



本文为周全同学原创文章,欢迎大家投稿




文章转载自开源之夏,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论