暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache SeaTunnel 插件开发最新经验总结,手慢无!

SeaTunnel 2024-01-25
283

在Apache SeaTunnel的最新插件开发中,Connector-v2 maxcompute 连接器实现了基于CatalogTable + SaveMode的新版本。

本文主要给大家分享了源端的关键改动包括弃用了过时的方法,改为通过CatalogTable实现数据传递。汇端则增加了对Multi-table sink和SaveMode接口的实现,并需手动管理auto service注解等经验方法!

此外,开发了特定的Catalog以处理公共参数和客户端操作。

Connector-v2 maxcompute 连接器

connector-v2 maxcompute 连接器
实现了基于CatalogTable + SaveMode
的新版本。

a) 源端的核心改动

  • 新版本中SeaTunnelSource
    继承自SatunnelPluginLifeCycle
    接口,其中包含了一个prepare
    方法。该方法原本用于初始化源端中的config
    参数,以及一些预处理工作,如删除数据、执行SQL等。
  • SeaTunnelSource
    中的getProducedType
    方法用于存储SeatunnelRowType
    ,即列映射转换信息的覆写。

改动如下:

  • 不再使用两个已过时的方法。
  • 在构造函数中创建CatalogTable
    ,并在新方法List<CatalogTable> getProducedCatalogTables()
    中实现catalog
    的数据传递。在新版本中,catalogTable
    将从源端传递到汇端,作为后续saveMode
    里批量同步建表的SQL拼接来源。
  • SeatunnelRowType
    可以从catalogTable
    中获取,以适配现有连接器,也可以自己封装公共参数进行传递。

关于Catalog:

  • 我们通过getTable
    获取CatalogTable
    。如果配置了自定义列(column
    ),以适配DataX,我们需要基于column
    生成CatalogTable

b) Sink端的核心改动

MaxcomputeSinkFactory
中,额外实现了createSink
方法,从上下文中获取catalog
ReadonlyConfig


Sink
需要去除auto service
注解,并实现multi-table sink
saveMode
接口。


基于DefaultSaveModeHandler
,通过schemaSaveMode
DataSaveMode
配置,利用catalog
中的方法实现数据清理、建表或个性化操作。

catalog
实现了truncate table
接口,用于数据删除和分区重建。

变动见此图


去除auto_service
注解的原因:

  • 如果某个连接器汇端(例如maxcompute)已经实现了新模式接口,不再通过过时的prepare
    方法注入config
    ,而是通过构造函数从上下文传入的catalogtable
    获取内部seatunnelRowType
    的适配。这个连接器在排序上比较靠前,AbstractPluginDiscovery
    在服务加载时总是首先发现它。但是,如果其他插件(例如clickhouse)仍在使用旧模式(prepare
    ),则MultipleTableJobConfigParser
    解析汇端会将其视为过时插件,并回退到JobConfigParser
    进行适配。在parseSink
    方法中,AbstractPluginDiscovery
    仍会扫描所有通过autoService
    注解注入的SeaTunnelSink
    插件。此时,它遇到的第一个插件maxcompute的构造函数不能适配旧模式的实例初始化,导致失败。
  • 这导致了Maxcompute到Clickhouse数据同步时,报出MaxcomputeSink无法初始化的问题。

  • MaxcomputeSinkWriter
    需要额外实现SupportMutiTableSinkWriter
    ,否则无法进行适当的转换。

如果要指定部分列回流:

  • 必须比较catalogTable
    中的列数量与指定的自定义列数量,两者相等才可以进行回流。
  • truncate -> data_save_mode / schema_save_mode
  • 支持列数据的隐式转换(seatunnel row
    数据是一个Object[]
    数组,需要结合CatalogTable TableSchema + Opds TableSchema

c) 开发特定的catalog

需要实现公共Catalog
接口,其中的open
close
可以处理一些公共参数和客户端操作。

catalog
需要通过catalog factory
进行实例化,并通过auto service
进行装配加载。


新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
Apache SeaTunnel Web部署指南
基于Apache SeaTunnel构建CDC数据同步管道

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论