暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

​深入理解 Apache SeaTunnel:易用、高性能、支持实时流式和离线批处理的海量数据集成平台

SeaTunnel 2023-08-02
1351

一、认识Apache SeaTunnel

Apache SeaTunnel 是一个非常易用、高性能、支持实时流式和离线批处理的海量数据集成平台,架构于 Apache Spark 和 Apache Flink 之上,支持海量数据的实时同步与转换。
SeaTunnel专注于数据集成和数据同步,主要解决以下数据集成领域的常见问题:
数据源多样:常用的数据源有数百种,版本不兼容。随着新技术的出现,出现了更多的数据源。用户很难找到能够全面快速支持这些数据源的工具。
复杂同步场景:数据同步需要支持离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。
资源需求高:现有的数据集成和数据同步工具往往需要大量的计算资源或JDBC连接资源来完成海量小表的实时同步。这在一定程度上加重了企业的负担。
缺乏质量和监控:数据集成和同步过程经常会丢失或重复数据。同步过程缺乏监控,无法直观了解任务过程中数据的真实情况。
技术栈复杂:企业使用的技术组件各不相同,用户需要针对不同的组件开发相应的同步程序来完成数据集成。
管理维护困难:受限于不同的底层技术组件(Flink/Spark),离线同步和实时同步往往是分开开发和管理的,增加了管理和维护的难度。

二、系统架构、工作流程与特性

Apache SeaTunnel 系统架构图:
Input/Source[数据源输入] -> Filter/Transform[数据处理] -> Output/Sink[结果输出]

上图为 Apache SeaTunnel 的整个工作流程,数据处理流水线由多个过滤器构成,以满足多种数据处理需求。如果用户习惯了 SQL,也可以直接使用 SQL 构建数据处理管道,更加简单高效。目前,Apache SeaTunnel 支持的过滤器列表也在扩展中。
Apache SeaTunnel的特点:
丰富且可扩展的Connector:Apache SeaTunnel提供了不依赖于特定执行引擎的Connector API。基于此API开发的连接器(Source、Transform、Sink)可以运行在很多不同的引擎上,比如目前支持的SeaTunnel Engine、Flink、Spark。
Connector插件:插件式的设计让用户可以很方便的开发自己的Connector,并集成到 Apache SeaTunnel项目中。目前已支持 100 多个 Connector,而且数量还在激增。
批流融合:基于Apache SeaTunnel Connector API开发的Connector,完美兼容离线同步、实时同步、全量同步、增量同步等场景。大大降低了管理数据集成任务的难度。支持分布式快照算法,保证数据一致性。
多引擎支持:Apache SeaTunnel默认使用SeaTunnel Engine进行数据同步。同时,Apache SeaTunnel也支持使用Flink或Spark作为Connector的执行引擎,以适配企业现有的技术组件,支持多个版本的 Spark 和 Flink。
JDBC多路复用,数据库日志多表解析:Apache SeaTunnel支持多表或全库同步,解决了JDBC连接过多的问题;支持多表或全库日志读取和解析,解决了CDC多表同步场景需要重复读取和解析日志的问题。
高吞吐低延迟:Apache SeaTunnel支持并行读写,提供高吞吐低延迟稳定可靠的数据同步能力。
完善的实时监控:Apache SeaTunnel支持数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据量、数据 大小、QPS等信息。
支持两种作业开发方式:编码和画布设计:提供了作业的可视化管理、调度、运行和监控能力。

三、SeaTunnel工作架构

Apache SeaTunnel的运行过程如上图所示。
  • 用户配置作业信息,选择执行引擎提交作业。Source Connector负责并行读取数据并将数据发送给下游Transform或直接发送给Sink,Sink将数据写入目的地。值得注意的是,无论是Source还是Transform和Sink,都可以很方便的自行开发扩展。
  • SeaTunnel 是一个 EL(T) 数据集成平台。因此,在SeaTunnel中,Transform只能用于对数据进行一些简单的转换,例如将某列的数据转换为大写或小写,更改列名,或者将一列拆分为多列。
  • SeaTunnel 使用的默认引擎是SeaTunnel Engine。如果您选择使用Flink或Spark引擎,SeaTunnel会将Connector打包成Flink或Spark程序提交给Flink或Spark运行。
        Source Connectors SeaTunnel 支持从各种关系数据库、图形数据库、NoSQL 数据库、文档数据库和内存数据库中读取数据。HDFS等各种分布式文件系统。S3、OSS等多种云存储。同时我们也支持很多常见的SaaS服务的数据读取。 
        转换连接器如果源和接收器之间的架构不同,您可以使用转换连接器更改从源读取的架构,使其与接收器架构相同。
        Sink Connector SeaTunnel 支持向各种关系数据库、图数据库、NoSQL 数据库、文档数据库和内存数据库写入数据。HDFS等各种分布式文件系统。S3、OSS等多种云存储。同时我们也支持向很多常见的SaaS服务写入数据。

四、部署SeaTunnel


1.安装Java

安装Java8以上版本

2.下载SeaTunnel

    export version="2.3.1" 
    wget "https://archive.apache.org/dist/incubator/seatunnel/{version}-bin.tar.gz"
    tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"

    3.安装连接器

    从2.2.0-beta开始,二进制包默认不提供connector依赖,所以第一次使用时,我们需要执行如下命令安装connector:(当然你也可以手动下载connector从https://repo.maven.apache.org/maven2/org/apache/seatunnel/下载,然后手动移动到connectors/seatunnel目录)。
      sh bin/install-plugin.sh 2.3.1
      如果需要指定connector的版本,以2.3.0-beta为例,我们需要执行
        sh bin/install-plugin.sh 2.3.1
        通常你不需要所有的连接器插件,所以你可以通过配置指定你需要的插件config/plugin_config,比如你只需要connector-console插件,那么你可以修改plugin_config为
          --connectors-v2-- connector-console --end--
          如果你想让示例应用程序正常工作,你需要添加以下插件
            --connectors-v2-- connector-fake connector-console --end--
            您可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的 plugin_config 配置名称。
            提示:如果想通过手动下载connector的方式安装connector插件,需要特别注意以下几点
            connectors目录包含以下子目录,如果不存在,需要手动创建
              seatunnel
              如果想手动安装V2 connector插件,只需要下载自己需要的V2 connector插件,放到seatunnel目录下即可

              五、快速启动作业

              1.添加作业配置文件以定义

              编辑config/v2.batch.config.template,决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个配置文件的例子,和上面提到的例子应用是一样的。


                env {
                execution.parallelism = 1
                job.mode = "BATCH"
                }
                source {
                FakeSource {
                result_table_name = "fake"
                row.num = 16
                schema = {
                fields {
                name = "string"
                age = "int"
                }
                }
                }
                }
                sink {
                Console {}
                }


                2.运行 SeaTunnel

                可以通过以下命令启动应用程序
                  cd "apache-seatunnel-incubating-${version}"
                  ./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
                  查看输出:运行命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功与否的标志。
                  SeaTunnel 控制台会打印一些日志如下:


                    2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name, age2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
                    2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
                    2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
                    2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
                    2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
                    2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
                    2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438

                    六、SeaTunnel集成Flink


                    1.部署和配置Flink

                    下载Flink,Flink版本要求>=1.12.0
                    配置 SeaTunnel:更改设置config/seatunnel-env.sh,它基于您的引擎在部署时安装的路径。更改FLINK_HOME为 Flink 部署目录。

                    2.添加作业配置文件以定义

                    编辑config/v2.streaming.conf.template,决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个配置文件的例子,和上面提到的例子应用是一样的。


                      env {
                      execution.parallelism = 1
                      job.mode = "BATCH"
                      }
                      source {
                      FakeSource {
                      result_table_name = "fake"
                      row.num = 16
                      schema = {
                      fields {
                      name = "string"
                      age = "int"
                      }
                      }
                      }
                      }
                      sink {
                      Console {}
                      }


                      3.运行SeaTunnel

                      Flink1.12.x和Flink1.14.x
                        cd "apache-seatunnel-incubating-${version}"./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template

                        Flink1.15.x和Fflink1.16.x

                          cd "apache-seatunnel-incubating-${version}"
                          ./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template


                          查看输出:运行命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功与否的标志。
                          SeaTunnel 控制台会打印一些日志如下:
                            2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name, age2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
                            2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
                            2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
                            2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
                            2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
                            2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
                            2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438

                            七、SeaTunnel集成Spark


                            1.部署和配置Spark

                            下载Spark(要求版本>=2.4.0) 配置 SeaTunnel:
                            更改设置config/seatunnel-env.sh,它基于您的引擎在部署时安装的路径。更改SPARK_HOME为 Spark 部署目录。

                            2.添加作业配置文件以定义

                            编辑config/seatunnel.streaming.conf.template,决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个配置文件的例子,和上面提到的例子应用是一样的。
                              env {
                              execution.parallelism = 1
                              job.mode = "BATCH"
                              }
                              source {
                              FakeSource {
                              result_table_name = "fake"
                              row.num = 16
                              schema = {
                              fields {
                              name = "string"
                              age = "int"
                              }
                              }
                              }
                              }
                              sink {
                              Console {}
                              }


                              3.运行SeaTunnel

                              可以通过以下命令启动应用程序
                              Spark2.4.x
                                cd "apache-seatunnel-incubating-${version}"
                                ./bin/start-seatunnel-spark-2-connector-v2.sh
                                --master local[4]
                                --deploy-mode client
                                --config ./config/seatunnel.streaming.conf.template
                                Spark3.xx
                                  cd "apache-seatunnel-incubating-${version}"
                                  ./bin/start-seatunnel-spark-3-connector-v2.sh
                                  --master local[4]
                                  --deploy-mode client
                                  --config ./config/seatunnel.streaming.conf.template
                                  查看输出:运行命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功与否的标志。
                                  SeaTunnel 控制台会打印一些日志如下:
                                    fields : name, age
                                    types : STRING, INT
                                    row=1 : elWaB, 1984352560
                                    row=2 : uAtnp, 762961563
                                    row=3 : TQEIB, 2042675010
                                    row=4 : DcFjo, 593971283
                                    row=5 : SenEb, 2099913608
                                    row=6 : DHjkg, 1928005856
                                    row=7 : eScCM, 526029657
                                    row=8 : sgOeE, 600878991
                                    row=9 : gwdvw, 1951126920
                                    row=10 : nSiKE, 488708928
                                    row=11 : xubpl, 1420202810
                                    row=12 : rHZqb, 331185742
                                    row=13 : rciGD, 1112878259
                                    row=14 : qLhdI, 1457046294
                                    row=15 : ZTkRx, 1240668386
                                    row=16 : SGZCr, 94186144


                                    八、运行命令

                                    Spark2:
                                      bin/start-seatunnel-spark-2-connector-v2.sh --config config/v2.batch.config.template -m local -e client
                                      Spark3:
                                        bin/start-seatunnel-spark-3-connector-v2.sh --config config/v2.batch.config.template -m local -e client
                                        Flink13和Flink14:
                                          bin/start-seatunnel-flink-13-connector-v2.sh --config config/v2.batch.config.template
                                          Flink15和Flink16:
                                            bin/start-seatunnel-flink-15-connector-v2.sh --config config/v2.batch.config.template


                                            Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

                                            仓库地址: 
                                            https://github.com/apache/seatunnel

                                            网址:
                                            https://seatunnel.apache.org/


                                            Apache SeaTunnel 下载地址:
                                            https://seatunnel.apache.org/download
                                             
                                            衷心欢迎更多人加入!

                                            我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!


                                            我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

                                            提交问题和建议:
                                            https://github.com/apache/seatunnel/issues

                                            贡献代码:
                                            https://github.com/apache/seatunnel/pulls

                                            订阅社区开发邮件列表 : 
                                            dev-subscribe@seatunnel.apache.org

                                            开发邮件列表:
                                            dev@seatunnel.apache.org

                                            加入 Slack:
                                            https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

                                            关注 Twitter: 
                                            https://twitter.com/ASFSeaTunnel

                                            精彩推荐




                                            Apache SeaTunnel Connector 使用文档和使用案例有奖征稿来了!一起玩开源




                                            曝光!Apache SeaTunnel Catalog 功能设计为何能大大简化用户启用步骤?




                                            基于 Apache SeaTunnel 的 MySQL 到 Elasticsearch 实时同步解决方案



                                            点击在看点赞转发⭐️!

                                            文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                            评论