
@
点击蓝字
关注我们

01
支持引擎
Spark
Flink
SeaTunnel Zeta
02
关键特性
使用 Xa transactions
保证精确一次性处理
. 所以精确一次性处理
仅可用于支持Xa transactions
的数据库. 可以设置is_exactly_once=true
来使用.
03
描述
04
支持数据源
| 数据源 | 支持版本 | 驱动器 | URL | Maven |
05
数据库依赖
下载与'Maven'对应的支持列表,并将其复制到'$SEATNUNNEL_HOME/plugins/jdbc/lib/'工作目录。 例如 Vertica datasource: cp vertica-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
06
数据类型映射
| Vertica数据类型 | SeaTunnel数据类型 |
INT UNSIGNED | |
TINYINT UNSIGNED SMALLINT SMALLINT UNSIGNED MEDIUMINT MEDIUMINT UNSIGNED INT INTEGER YEAR | |
INTEGER UNSIGNED BIGINT | |
(Gets the designated column's number of digits to right of the decimal point.))) | |
FLOAT UNSIGNED | |
DOUBLE UNSIGNED | |
VARCHAR TINYTEXT MEDIUMTEXT TEXT LONGTEXT JSON | |
TIMESTAMP | |
MEDIUMBLOB BLOB LONGBLOB BINARY VARBINAR BIT(n) | |
UNKNOWN |
07
Sink选项
| 名称 | 类型 | 要求 | 默认值 | 描述 |
|---|---|---|---|---|
| url | 字符串 | 必填 | - | JDBC 连接的 URL。例如:jdbc:vertica://localhost:5433/vertica |
| driver | 字符串 | 必填 | - | 用于连接到远程数据源的 JDBC 类名,如果使用 Vertica,则值为 com.vertica.jdbc.Driver。 |
| user | 字符串 | 可选 | - | 连接实例的用户名 |
| password | 字符串 | 可选 | - | 连接实例的密码 |
| query | 字符串 | 可选 | - | 使用此 SQL 将上游输入数据写入数据库。例如 INSERT ..., query具有更高的优先级。 |
| database | 字符串 | 可选 | - | 使用此 database和 table-name自动生成 SQL,并接收上游输入数据写入数据库。此选项与 query互斥,并具有更高的优先级。 |
| table | 字符串 | 可选 | - | 使用数据库和此表名自动生成 SQL,接收上游输入数据写入数据库。此选项与 query互斥,并具有更高的优先级。 |
| primary_keys | 数组 | 可选 | - | 用于在自动生成 SQL 时支持 insert、 delete和 update操作。 |
| support_upsert_by_query_primary_key_exist | 布尔 | 可选 | false | 根据查询主键是否存在选择使用 INSERT SQL、UPDATE SQL 处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持 upsert 语法时使用。请注意,此方法性能较低。 |
| connection_check_timeout_sec | 整数 | 可选 | 30 | 等待用于验证连接的数据库操作完成的时间(以秒为单位)。 |
| max_retries | 整数 | 可选 | 0 | 提交失败(executeBatch)的重试次数。 |
| batch_size | 整数 | 可选 | 1000 | 用于批量写入的缓冲记录达到 batch_size数量或时间达到 batch_interval_ms时,数据将刷新到数据库。 |
| batch_interval_ms | 整数 | 可选 | 1000 | 用于批量写入的缓冲记录达到 batch_size数量或时间达到 batch_interval_ms时,数据将刷新到数据库。 |
| is_exactly_once | 布尔 | 可选 | false | 是否启用精确一次性语义,将使用 XA 事务。如果启用,需要设置 xa_data_source_class_name。 |
| generate_sink_sql | 布尔 | 可选 | false | 基于要写入的数据库表生成 SQL 语句。 |
| xa_data_source_class_name | 字符串 | 可选 | - | 数据库驱动程序的 XA 数据源类名,例如,Vertica 为 com.vertica.jdbc.VerticaXADataSource。其他数据源请参考附录。 |
| max_commit_attempts | 整数 | 可选 | 3 | 事务提交失败的重试次数。 |
| transaction_timeout_sec | 整数 | 可选 | -1 | 事务打开后的超时时间,默认为 -1(永不超时)。请注意,设置超时可能会影响精确一次性语义。 |
| auto_commit | 布尔 | 可选 | true | 默认启用自动事务提交。 |
| common-options | 可选 | - | Sink 插件的通用参数,请参考 Sink Common Options 获取详细信息。 |
Tips
如果未设置 partition_column
,将以单并发方式运行;如果设置了partition_column
,将根据任务的并发度并行执行。
08
任务示例
简单示例:
该示例定义了一个 SeaTunnel 同步任务,它通过 FakeSource 自动生成数据并将其发送到 JDBC Sink。FakeSource 生成了总共 16 行数据(row.num=16),每行有两个字段,name(字符串类型)和 age(整数类型)。最终目标表是 test_table,表中也会有 16 行数据。在运行此作业之前,您需要在您的 Vertica 数据库中创建数据库 test 和表 test_table。如果您尚未安装和部署 SeaTunnel,请按照安装 SeaTunnel中的说明进行安装和部署。然后按照SeaTunnel Engine 快速入门(https://chat.openai.com/start-v2/locally/quick-start-seatunnel-engine.md)中的说明运行此作业。
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform-v2
}
sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}
这个示例不需要编写复杂的SQL语句,您可以配置数据库名称和表名称,以自动为您生成添加语句。
sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
user = "root"
password = "123456"
# Automatically generate sql statements based on database table names
generate_sink_sql = true
database = test
table = test_table
}
}
对于精确写入场景,我们保证精确一次性。
sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.vertical.cj.jdbc.VerticalXADataSource"
}
}
Apache SeaTunnel
精彩推荐
一键三连-点赞在看转发⭐️!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




