暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SeaTunnel 支持 IoTDB,玩转物联网数据同步!

SeaTunnel 2022-11-08
2065

 点亮 ⭐️ Star · 照亮开源之路

https://github.com/apache/incubator-seatunnel


Meetup 10/15

Apache IoTDB(物联网数据库)是一体化收集、存储、管理与分析物联网时序数据的软件系统,可以满足工业物联网领域的海量数据存储、高速数据读取和复杂数据分析需求。目前,SeaTunnel 已经支持 IoTDB Connector,实现了物联网领域数据同步场景接通。


在今年 10 月份 SeaTunnel 社区线上见面会上,SeaTunnel Committer 王海林 为大家介绍了 SeaTunnel 接入 IoTDB 的实现过程,让用户更深刻地理解 IoTDB 数据同步的操作方法和原理。


我今天分享的主题是使用 SeaTunnel 玩转 IoTDB 的数据同步。本次分享分为 6 小节。首先会让大家对  SeaTunnel 的基本概念有一个了解,并在此基础上着重介绍一下 IoTDB Connector 的功能特性,随后会深入分析一下 IoTDB Connector 的数据读取和数据写入功能以及实现的解析,最后会展示一些典型的使用场景和案例,让大家了解如何使用 IoTDB Connector 落地到生产环境。最后一点是社区对 IoTDB Connector 的下一步规划,并指导大家如何参与贡献。

01

SeaTunnel 基本概念介绍


这是 SeaTunnel 基本的架构图,这是一个为数据同步而生的引擎,为了从各式各样的数据源读取数据并写入到各式各样的数据源,SeaTunnel 做了一套抽象的 API。


左侧简单罗列了 Source 的场景,比如我们抽象了 Source  的 API,Type 和 State,来读取数据源,把各种数据源的数据类型统一到在其定义的抽象类型上,并读取过程中的一些状态恢复和读取位置的保留。

这是对于 Source 的抽象,对于 Sink 我们也做了类似的抽象,即如何写入数据,以及数据类型如何匹配到真实的数据源类型,以及状态的恢复和保留。

基于这些API,我们会有一个 translation 层把这些API翻译到对应的执行引擎上。目前 SeaTunnel 支持三个执行引擎,Spark,Flink,以及即将发布的自研执行引擎 SeaTunnel Engine。

这大致就是 SeaTunnel 所做的事。SeaTunnel 做数据同步依赖  Source 和 Sink 读取和写入数据,我们把它们叫做Connector 连接器。连接器由 Source 和 Sink 组成。


从上图我们看到不同的数据源,Source 负责从各种数据源中读取数据,将其转化成SeaTunnelRow 抽象层(匹配 SeaTunnel 定义的数据类型),Sink 负责从抽象层上拉取数据,写到具体的数据存储上,转化成存储具体的格式。

通过  Source +抽象层+Sink 这样的组合,就能够完成多种异构数据源之间数据的搬运同步。

下面我用一个简单的实例说明一下 SeaTunnel 的 Source 和 Sink 如何运转。


我们通过配置文件可以指定一些 Source,Sink 配置文件组合通过SeaTunnel 提供的工具包中的命令带上配置文件,执行后可以实现数据搬运。


这是目前 SeaTunnel 已经支持的 Connector 生态,比如  JBDC 支持的数据源,HDFS,  Hive, Pulsar,消息队列等目前都有支持。

图中所列并不是 SeaTunnel 支持 Connector 的全部,GitHub SeaTunnel 项目下可以看到插件目录,支持的  Connector  插件在不断增加,在这里可以实时看到最新的接入情况。

02

IoTDB Connector 功能特性


下面是关于 IoTDB Connector 的接入情况。

首先介绍下 IoTDB 的功能特性,也就是 SeaTunnel 集成的 IoTDB Connector,它到底支持哪些功能,为大家提供参考。

01

Source 功能特性


首先是 Source 支持的典型的使用场景,比如批量读取 device、字段投影、数据类型映射、并行读取等。

如上图所示,IoTDB 除了不支持至多一次、刚好一次 和流模式之外,其他功能是都支持的,比如批量读取, IoTDB 有一个类似于group by device的 SQL 语法,可以把多个设备的数据统一进行批次读取。基本的数据类型投影,在IoTDB 的SQL 在查任何指标时会默认带上 time,或 group by device 会带上device 列,我们也默认支持投影到SeaTunnel 的列上。

数据类型只有 Victor 不支持,其他的都支持。

关于并行读取这一块,IoTDB 数据中实际上是有时间戳的,我们用时间戳划分范围来实现并行读取。

状态的恢复,因为我们对读取的时间范围划分了不同的 split,所以可以根据 Split 位置信息进行恢复。

02

Sink功能特性



上图为 SeaTunnel 已支持的功能。关于元数据提取,我们支持从 SeaTunnelRow 中提取 measurement、device 等元数据,支持从  SeaTunnelRow 中提取或使用当前处理时间。批量提交、异常重试也是支持的。

03

IoTDB数据读取解析 


接下来我们解析一下数据读取的实现和支持。

01

数据类型映射


首先是数据类型映射,实际上是把 IoTDB 的数据类型读到 SeaTunnel 上,所以要转化成 SeaTunnel 的数据类型。

这里我们列出来的BOOLEAN、INT32、INT64 等都有对应的 SeaTunnel 数据类型。其中  INT32 是可以根据 SeaTunnel 上的读取类型进行映射的,当值的范围比较小时,也可以映射成TINYINT、SMALLINT或 INT。

Vector 类型目前还没有支持。

这是对应的示例代码,展示类型转换的地方如何做映射。

02

字段投影


另一个是读取时的字段投影,我们在读 IoTDB  数据时,可以自动映射 Time 字段,也可以选择部分数据映射到 SeaTunnel 上,比如TIMESTAMP、BIGINT。


通过SQL提取列码,可以只提取部分需要的列,在 SeaTunnel 上使用时,可以通过 feilds 来指定列映射到 SeaTunnel 后的名字、类型等。最后读取到SeaTunnel上的数据结果如上图所示。


刚刚大家其实看到了,我们SQL里面没有查time这个列,但实际查出结果是有这个列的,所以我们这投影是支持对 time列的字段进行投影的,Time 列实际上可以投影成不同的数据类型,用户可以根据自己的需要来进行转换。上图为实现逻辑。

03

批量读取Device


读取还涉及到批量读取 device。这是一个常见的需求,因为我们同步数据的时候可能都是大批量地做同数据结构的数据同步。


SeaTunnel支持 align by device语法,这样就可以把 device列也投影到 SeaTunnelRow上。

假设  IoTDB 中有一张表,我们通过语法把  device 列也做成数据,投影到SeaTunnel上,配置了device name列并指定数据类型之后,我们最终读到 SeaTunnel 上的数据格式如上图所示,包含Time、device列,以及实际的数据值。这样可以做到批量读取相同的device的数据。

04

并行读取


另一个是并行读取。

  • 切分 Split

我们通过 Time列进行范围划分,如果要并行读取,我们可能要对这张表的数据范围进行划分,让并行的线程/进程读取特定范围的数据。按照 Time 范围划分,我们设置了 三个配置,lower_bound、upper_bound 和 num_partitions。通过配置三个参数,最终效果就会是据此转化成查询的 SQL,原始的 SQL 会加上查询条件,划分成不同的 split 达成实际的读取的 SQL。

  • 分配  Split 给读取 reader

Split 分好了,要分给每一个并行的 reader, 需要遵循一个分配逻辑。

这个逻辑就是根据 Split 中的 ID  向 reader取模,这个可能有较大的随机性,如果 split 的 ID 比较散列的话就会比较均匀,要根据 Connector 的具体情况实现。

实现的效果如图所示。

05

状态恢复


在读取的时候还会涉及到状态恢复,因为如果任务比较大,读取的时间会较长,中间如果出现错误或者异常,就要考虑如何从出错的点恢复状态,恢复之后接着再读。

SeaTunnel 的状态恢复主要是通过  reader 把 未读取的 Split 信息存到 state里,而后引擎在读取时会定期对  State 做快照保留,我们恢复的时候就可以恢复最后一次快照,恢复后继续读取。

04

IoTDB Connector 数据写入解析


接下来是关于数据写入的解析。

01

数据类型映射



数据写入也涉及到数据类型映射,但这里与数据读取相反,是把 SeaTunnel的数据类型转换为IoTDB 类型。因为 IoTDB 只有 INT32 ,所以写入过程会涉及到 TINYINT 和 SMALLINT 的数据类型提升。其他的数据类型都是一对一可以转换的。ARRAY 和 VECTOR  数据类型的转换还暂时不支持。

上图为对应的代码,实现逻辑需要看我们具体的映射。

02

元数据动态注入


在写入这块也有原数据的注入的问题,SeaTunnel 支持元数据的动态注入。

当异构数据源写入 IoTDB 时,支持从每一行数据中提取 device、measurement、time,方法是通过序列化 SeaTunnelRow 时按配置提取固定列值。或者使用系统时间作为 time,如果未指定 time 列则填充当前系统时间;同时,还支持配置 storage group,自动附加到 device 前缀。


举例来说,假设在 SeaTunnel 读取上图所示的数据格式的 row 的结构,可以通过配置同步到IoTDB 中,获得的结果如下:


提取了我们需要的温度、湿度这两个列,并提取了 ts 和device name来作为IoTDB 的原数据。

03

批量提交与异常重试


另外,Sink在写入时需要做处理批量和重试。对于批量,我们可以配置相应的批量配置,包括支持配置批量提交的条数与间隔;如果数据缓存到内存,可以开启独立线程定时提交。

对于重试,SeaTunnel 支持配置重试次数,等待间隔与最大重试次数,以及当重试结束后,如果遇到不可恢复的错误也可以结束。

05

IoTDB Connector 使用示例


经过前面对读取数据和写入数据的解析之后,我们来看三个典型的使用场景示例。

01

从 IoTDB导出数据


第一个场景是从 IoTDB 导出数据,我这里举的例子是从IoTDB读取数据导到Console 上。
  • 并行读取,输出到 Console
    • 并行度:2
    • 批次数:24
    • 时间范围:2022-09-25 ~ 2022-09-26

我们假设在 IoTDB 有一张数据表,我们要把数据导到 Console 上,整个配置如上图所示,需要映射我们要导出的数据列以及查的时间范围。

这是一个最简单的示例,实际使用中可能  Sink 端更为复杂,需要参考对应数据源的文档做相应的配置。

02

导入数据到 IoTDB


  • 读取数据库,批量写入到 IoTDB
    • 批量写:每 1024 条或每 1000 ms 提交一次
    • 提取元数据 device、timestamp、measurement
    • 指定存储组:root.test_group

另外一个典型的使用场景是把其他数据源的数据导入到  IoTDB。假设我有一个外部的数据库表,有 ts、温度、湿度等列,我们将其导入到 IoTDB 中,要求有温度和湿度这两列,其他的可以不要。整个配置如上图所示,大家可以参考。

在 Sink 端主要是要指定  device 列的  Key,比如从哪些数据中提取 device,时间是从哪一个类提取,要写哪些列到 IoTDB 中等。

可以看到,我们可以配置  storage group,也就是 IoTDB 的存储组,可以通过storage group 指定存储组。

03

IoTDB 之间同步数据


第三个使用场景是在 IoTDB 与 IoTDB 之间同步数据,批量写入到 IoTDB。假设 IoTDB 中有一张表需要同步到另一个 IoTDB,同步过去之后存储组发生了变更,数据列的指标的名字也发生了变更,这时可以使用投影改写指标名称,并使用 SQL 改写存储组。

06

如何参与社区贡献


最后来说一下 IoTDB Connector 的下一步计划,以及大家如何来参与完善 Connector,贡献需要的新功能。

01

IoTDB Connector 的下一步计划


  • 支持读写 vector 数据类型读和写
  • 支持 tsfile 读写
  • 支持写 tsfile 再 load 到 IoTDB

SeaTunnel Connector 接入 issue:
https://github.com/apache/incubator-seatunnel/issues/3012

这是我们计划的 SeaTunnel Connector 接入 issue,上面有列出来功能列表以及待实现的 connector,大家感兴趣的话可以领取任务做贡献。

以下是 SeaTunnel 社区关于 Connector  接入贡献的一些入门指南:
  • 寻找你感兴趣的 issue:
    • https://github.com/apache/incubator-seatunnel/issues/2828
    • https://github.com/apache/incubator-seatunnel/issues/1946
    • https://github.com/apache/incubator-seatunnel/issues?q=is:open+is:issue+label:"help+wanted"

  • 参考贡献指南
    • https://github.com/apache/incubator-seatunnel/issues/2828
    • https://github.com/apache/incubator-seatunnel/pull/2995【贡献指南】
    • https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/setup.md 【编译指南】

  • 新手教程
    • https://mp.weixin.qq.com/s/evYRsrpPb-MXEm4DqLdnvQ【SeaTunnel 连接器极简开发流程】
    • https://mp.weixin.qq.com/s/8ivksAePmVZe2_2ldGi0AA 【新 API Connector 开发解析】
    • https://mp.weixin.qq.com/s/qpO2SVRl9KAY-Ib2kaTDig 【Source 与 Sink API 设计解析】

  • 参与讨论 & 寻求帮助
    • 在邮件列表、Slack 中讨论
    • 通过微信群沟通(如果没有加入请关注 SeaTunnel 公众号入群)
    • 参与 PR Review 发表你的见解

SeaTunnel Connector 接入激励计划已经进行到第二期了,我们对需要接入的 Connector  做了难易程度的划分,大家可以参与活动,完善  Connector 生态,支持的生态越广,大家用起来就会越方便,就可以支持更多的同步场景。

上图是部分需要大家一起贡献的  Connector,在此呼吁大家参与激励活动,攒积分,获大奖,活动刚刚开始,现在上车正好!

活动链接:https://github.com/apache/incubator-seatunnel/issues/3018

Apache SeaTunnel


Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/incubator-seatunnel

网址:
https://seatunnel.apache.org/

Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

Apache SeaTunnel(Incubating)  下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/incubator-seatunnel/issues

贡献代码:
https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

往期推荐




SeaTunnel 2.3.0-beta 重磅更新,自研同步引擎 SeaTunnel Engine 发布并支持更多连接器!




助力 Shopee 重构系统发布 ETL 功能,SeaTunnel 在电商巨头的实践经验




马蜂窝毕博:分析完这9点工作原理,我们最终选择了 Apache SeaTunnel!



点击“阅读原文”参与开源

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论