暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何基于 Apache SeaTunnel 从 Oracle 读取数据

SeaTunnel 2024-10-31
759

引言

在大数据时代,企业面临着数据的快速增长和多样化需求,如何高效地处理和整合来自不同数据源的数据成为了关键问题。

Apache SeaTunnel作为一款开源数据集成工具,提供了灵活的数据处理和实时数据同步能力,广泛应用于数据仓库、数据湖及实时分析场景中。与此同时,Oracle 数据库以其高性能和可靠性,成为许多企业数据存储的首选。结合 Apache SeaTunnel 与 Oracle 数据库,可以实现高效的数据迁移与转换。

Apache SeaTunnel

Apache SeaTunnel 是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取、转换和加载(ETL)。SeaTunnel 提供了多种连接器,能够轻松集成不同的数据源和目标,包括关系型数据库、NoSQL 数据库、文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。

Oracle 数据库

Oracle 数据库是一款广泛使用的关系型数据库管理系统,以其强大的事务处理能力和安全性而著称。它适用于处理大型企业级应用程序的数据存储需求。Oracle 支持多种数据类型和复杂查询,能够高效地管理和分析大量数据,适合用于金融、电信、医疗等行业。

Oracle JDBC 连接器概述

连接器描述

Oracle JDBC 连接器通过 JDBC 方式读取外部数据源的数据,支持 Apache SeaTunnel 的多种引擎,包括 Spark、Flink 和 SeaTunnel Zeta。

使用依赖

Spark/Flink 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/plugins/
    目录中。
  2. 为支持国际化字符集,复制 orai18n.jar
    ${SEATUNNEL_HOME}/plugins/
    目录。

SeaTunnel Zeta 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/lib/
    目录中。
  2. 为支持国际化字符集,复制 orai18n.jar
    ${SEATUNNEL_HOME}/lib/
    目录。

主要特性

  • 批处理支持
  • 精确一次(exactly-once)语义
  • 列投影(column projection)
  • 并行处理(parallelism)
  • 支持用户定义的分割(user-defined split)

支持的数据源信息

数据源支持版本驱动URLMaven
Oracle依赖版本不同有不同驱动类oracle.jdbc.OracleDriverjdbc:oracle:thin:@datasource01:1523:xehttps://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8

数据库依赖

请下载对应于 Maven 的支持列表,并将其复制到 ${SEATUNNEL_HOME}/plugins/jdbc/lib/
工作目录中。

例如 Oracle 数据源:

cp ojdbc8-xxxxxx.jar $SEATUNNEL_HOME/lib/

为支持国际化字符集,复制 orai18n.jar
${SEATUNNEL_HOME}/lib/
目录中。

数据类型映射

Oracle 数据类型SeaTunnel 数据类型
INTEGERDECIMAL(38,0)
FLOATDECIMAL(38,18)
NUMBER(precision <= 9, scale == 0)INT
NUMBER(9 < precision <= 18, scale == 0)BIGINT
NUMBER(18 < precision, scale == 0)DECIMAL(38,0)
NUMBER(scale != 0)DECIMAL(38,18)
BINARY_DOUBLEDOUBLE
BINARY_FLOAT
REAL
FLOAT
CHAR
NCHAR
VARCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
XML
STRING
DATETIMESTAMP
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP
BLOB
RAW
LONG RAW
BFILE
BYTES

源选项

名称类型是否必需默认值描述
urlString-JDBC 连接的 URL。例如:jdbc:oracle:thin:@datasource01:1523:xe
driverString-用于连接远程数据源的 JDBC 类名,例如 oracle.jdbc.OracleDriver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-查询语句
connection_check_timeout_secInt30验证连接时的超时时间(秒)
partition_columnString-用于并行处理的分区列名,仅支持数值类型主键
partition_lower_boundBigDecimal-扫描的 partition_column 最小值
partition_upper_boundBigDecimal-扫描的 partition_column 最大值
partition_numIntjob parallelism分区数量,仅支持正整数,默认值为作业并行性
fetch_sizeInt0查询时的行抓取大小,0 表示使用 JDBC 默认值
propertiesMap-其他连接配置参数
名称类型是否必需默认值描述
urlString-JDBC 连接的 URL,例如:jdbc:mysql://localhost:3306/test
driverString-用于连接远程数据源的 JDBC 类名,例如 MySQL 为 com.mysql.cj.jdbc.Driver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-查询语句
connection_check_timeout_secInt30验证连接时的超时时间(秒)
partition_columnString-用于并行处理的分区列名,仅支持数值类型主键,且只能配置一列。
partition_lower_boundBigDecimal-扫描的 partition_column
最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_boundBigDecimal-扫描的 partition_column
最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_numIntjob parallelism分区数量,仅支持正整数,默认值为作业并行性。
fetch_sizeInt0查询时的行抓取大小,0 表示使用 JDBC 默认值。
propertiesMap-其他连接配置参数,当 properties
和 URL 有相同参数时,优先级由具体驱动的实现决定。
table_pathString-表的完整路径,可以使用该配置替代 query
。例如: MySQL 为 testdb.table1
,Oracle 为 test_schema.table1
table_listArray-要读取的表列表,可以使用该配置替代 table_path
。例如: [{ table_path = "testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name from testdb.table2"}]
where_conditionString-所有表/查询的通用行过滤条件,必须以 where
开头,例如 where id > 100
split.sizeInt8096表的拆分大小(行数),捕获的表在读取时被拆分为多个部分。
split.even-distribution.factor.lower-boundDouble0.05块键分布因子的下限,用于判断表数据是否均匀分布。
split.even-distribution.factor.upper-boundDouble100块键分布因子的上限,用于判断表数据是否均匀分布。
split.sample-sharding.thresholdInt10000触发样本分片策略的估计分片数量阈值。
split.inverse-sampling.rateInt1000样本分片策略中使用的采样率的反值。
common-options
-源插件通用参数,详细信息请参考 Source Common Options。

并行读取

JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用一定规则拆分表中的数据,并将其交给读取器进行读取。读取器的数量由 parallelism
选项决定。

拆分键规则

  1. 如果 partition_column
    不为空,则使用该列进行计算。该列必须为 支持的拆分数据类型
  2. 如果 partition_column
    为空,SeaTunnel 将读取表的模式并获取主键和唯一索引。如果主键和唯一索引中有多个列,则使用第一个支持的拆分数据类型列进行拆分。例如,表的主键为 (guid, name)
    ,因为 guid
    不在 支持的拆分数据类型 中,因此使用 name
    列进行拆分。

支持的拆分数据类型

  • String
  • Number (int, bigint, decimal, ...)
  • Date

拆分相关选项

split.size

每个拆分中包含的行数,捕获的表在读取时被拆分为多个部分。

split.even-distribution.factor.lower-bound

不推荐使用

块键分布因子的下限。用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于该下限(即 (MAX(id) - MIN(id) + 1) row count
),则表块将被优化为均匀分布。否则,如果分布因子小于下限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold
指定的值,将使用基于采样的分片策略。默认值为 0.05。

split.even-distribution.factor.upper-bound

不推荐使用

块键分布因子的上限。用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于该上限(即 (MAX(id) - MIN(id) + 1) / row count
),则表块将被优化为均匀分布。否则,如果分布因子大于上限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold
指定的值,将使用基于采样的分片策略。默认值为 100.0。

split.sample-sharding.threshold

此配置指定估计分片数量的阈值,以触发样本分片策略。当分布因子超出指定的上下限时,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用样本分片策略。这可以帮助更高效地处理大型数据集。默认值为 1000 个分片。

split.inverse-sampling.rate

样本分片策略中使用的采样率的反值。例如,如果该值设置为 1000,则在采样过程中应用 1/1000 的采样率。此选项提供灵活性,以控制采样的粒度,从而影响最终的分片数量。尤其在处理非常大的数据集时,较低的采样率更为合适。默认值为 1000。

partition_column [string]

拆分数据的列名。

partition_upper_bound [BigDecimal]

扫描的 partition_column
最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。

partition_lower_bound [BigDecimal]

扫描的 partition_column
最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。

partition_num [int]

不推荐使用,正确的方法是通过 split.size
控制拆分的数量。

我们需要拆分成多少个块,仅支持正整数,默认值为作业并行性。

小贴士

如果表无法拆分(例如,表没有主键或唯一索引,且未设置 partition_column
),则将以单一并发执行。

使用 table_path
替代 query
进行单表读取。如果需要读取多个表,使用 table_list

示例任务

以下是一些使用 JDBC 源连接器的任务示例:

简单示例

该示例在 TEST_TABLE
表中查询所有字段,您还可以指定要查询哪些字段以最终输出到控制台。

env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
query = "SELECT * FROM TEST_TABLE"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
Console {}
}

partition_column
并行读取

使用配置的分片字段和分片数据并行读取查询表,如果想读取整个表,可以使用此方法。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        partition_column = "ID"
        partition_num = 10
    }
}
sink {
  Console {}
}

按主键或唯一索引并行读取

通过配置 table_path
开启自动分片,可以配置 split.* 来调整分片策略。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        table_path = "DA.SCHEMA1.TABLE1"
        query = "select * from SCHEMA1.TABLE1"
        split.size = 10000
    }
}
sink {
  Console {}
}

并行读取

根据配置的上下限更高效地读取数据。

source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        partition_column = "ID"
        partition_lower_bound = 1
        partition_upper_bound = 500
        partition_num = 10
    }
}

多表读取

配置 table_list
将开启自动拆分,以配置split.
来调整分割策略*。

env {
  job.mode = "BATCH"
  parallelism = 4
}
source {
  Jdbc {
    url = "jdbc:oracle:thin:@datasource01:1523:xe"
    driver = "oracle.jdbc.OracleDriver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "123456"
    "table_list"=[
        {
            "table_path"="XE.TEST.USER_INFO"
        },
        {
            "table_path"="XE.TEST.YOURTABLENAME"
        }
    ]
    #where_condition= "where id > 100"
    split.size = 10000
    #split.even-distribution.factor.upper-bound = 100
    #split.even-distribution.factor.lower-bound = 0.05
    #split.sample-sharding.threshold = 1000
    #split.inverse-sampling.rate = 1000
  }
}

sink {
  Console {}
}

总结

Apache SeaTunnel为连接和集成Oracle数据库提供了灵活的解决方案。通过简单的配置,用户可以高效地从Oracle数据库读取和处理数据,满足不同的业务需求。

无论是在实时数据处理还是批量数据集成场景中,SeaTunnel都能为用户带来显著的便利和高效。


同步Demo

 MySQL→Doris
MySQLCDC
MySQL→Hive
 HTTP → Doris 
HTTP → MySQL

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
Apache SeaTunnel Web部署指南
基于Apache SeaTunnel构建CDC数据同步管道
【用户投稿】Apache SeaTunnel 2.3.3+Web 1.0.0版本安装部署
【安装部署】Apache SeaTunnel 和 Web快速安装详解
【保姆级教程】使用SeaTunnel同步Kafka的数据到ClickHouse
【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会
众安保险 兆原数通 亚信科技
映客 翼康济世 信也科技
华润置地

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

源码解析


Apache SeaTunnel Zeta引擎源码解析(一) Server端的初始化
Apache SeaTunnel Zeta引擎源码解析(二) Client端的任务提交流程
Apache SeaTunnel Zeta引擎源码解析(三) Server端接收任务的执行流程
从启动到关闭 | SeaTunnel2.1.1源码解析
SeaTunnel 2.1.2 封装 Flink 连接数据库的源码解析
那些年,我们在Apache SeaTunnel 2.1.0部署中踩过的坑【含源码分析】


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论