暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Vertica Sink Connector 使用必看手册

SeaTunnel 2023-09-08
867

@

点击蓝字

关注我们

文档贡献 | 高俊
翻译编辑 | Debra Chen


01




支持引擎

Spark
Flink
SeaTunnel Zeta


02




关键特性


✅精确一次性处理
❎CDC
使用 Xa transactions
 保证 精确一次性处理
. 所以 精确一次性处理
 仅可用于支持Xa transactions
的数据库. 可以设置 is_exactly_once=true
来使用.


03




描述

数据通过 JDBC 写入。支持批处理模式和流式模式,支持并发写入,支持使用 XA 事务保证的精确一次语义。

04




支持数据源

数据源
支持版本驱动器
URL
Maven
Vertica
不同的依赖版本可能会有不同的驱动程序类
com.vertica.jdbc.Driver
jdbc:vertica://localhost:5433/vertica
下载


05




数据库依赖


下载与'Maven'对应的支持列表,并将其复制到'$SEATNUNNEL_HOME/plugins/jdbc/lib/'工作目录。

例如 Vertica datasource: cp vertica-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/


06




数据类型映射

Vertica数据类型SeaTunnel数据类型
BIT(1)
INT UNSIGNED
BOOLEAN
TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNED
DECIMAL(20,0)
DECIMAL(x,y)(Get the designated column's specified column size.<38)
DECIMAL(x,y)
DECIMAL(x,y)(Get the designated column's specified column size.>38)
DECIMAL(38,18)
DECIMAL UNSIGNED
DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.)))
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON
STRING
DATE
DATE
TIME
TIME
DATETIME
TIMESTAMP
TIMESTAMP
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n)
BYTES
GEOMETRY
UNKNOWN
Not supported yet

07




Sink选项

名称类型要求默认值描述
url字符串必填-JDBC 连接的 URL。例如:jdbc:vertica://localhost:5433/vertica
driver字符串必填-用于连接到远程数据源的 JDBC 类名,如果使用 Vertica,则值为 com.vertica.jdbc.Driver
user字符串可选-连接实例的用户名
password字符串可选-连接实例的密码
query字符串可选-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...
query
 具有更高的优先级。
database字符串可选-使用此 database
 和 table-name
 自动生成 SQL,并接收上游输入数据写入数据库。此选项与 query
 互斥,并具有更高的优先级。
table字符串可选-使用数据库和此表名自动生成 SQL,接收上游输入数据写入数据库。此选项与 query
 互斥,并具有更高的优先级。
primary_keys数组可选-用于在自动生成 SQL 时支持 insert
delete
 和 update
 操作。
support_upsert_by_query_primary_key_exist布尔可选false根据查询主键是否存在选择使用 INSERT SQL、UPDATE SQL 处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持 upsert 语法时使用。请注意,此方法性能较低。
connection_check_timeout_sec整数可选30等待用于验证连接的数据库操作完成的时间(以秒为单位)。
max_retries整数可选0提交失败(executeBatch)的重试次数。
batch_size整数可选1000用于批量写入的缓冲记录达到 batch_size
数量或时间达到 batch_interval_ms
 时,数据将刷新到数据库。
batch_interval_ms整数可选1000用于批量写入的缓冲记录达到 batch_size
数量或时间达到 batch_interval_ms
 时,数据将刷新到数据库。
is_exactly_once布尔可选false是否启用精确一次性语义,将使用 XA 事务。如果启用,需要设置 xa_data_source_class_name
generate_sink_sql布尔可选false基于要写入的数据库表生成 SQL 语句。
xa_data_source_class_name字符串可选-数据库驱动程序的 XA 数据源类名,例如,Vertica 为 com.vertica.jdbc.VerticaXADataSource
。其他数据源请参考附录。
max_commit_attempts整数可选3事务提交失败的重试次数。
transaction_timeout_sec整数可选-1事务打开后的超时时间,默认为 -1(永不超时)。请注意,设置超时可能会影响精确一次性语义。
auto_commit布尔可选true默认启用自动事务提交。
common-options
可选-Sink 插件的通用参数,请参考 Sink Common Options 获取详细信息。

Tips

如果未设置 partition_column
,将以单并发方式运行;如果设置了 partition_column
,将根据任务的并发度并行执行。


08




任务示例


简单示例:
该示例定义了一个 SeaTunnel 同步任务,它通过 FakeSource 自动生成数据并将其发送到 JDBC Sink。FakeSource 生成了总共 16 行数据(row.num=16),每行有两个字段,name(字符串类型)和 age(整数类型)。最终目标表是 test_table,表中也会有 16 行数据。在运行此作业之前,您需要在您的 Vertica 数据库中创建数据库 test 和表 test_table。如果您尚未安装和部署 SeaTunnel,请按照安装 SeaTunnel中的说明进行安装和部署。然后按照SeaTunnel Engine 快速入门(https://chat.openai.com/start-v2/locally/quick-start-seatunnel-engine.md)中的说明运行此作业。
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 1
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform-v2
}

sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}


生成Sink SQL:
这个示例不需要编写复杂的SQL语句,您可以配置数据库名称和表名称,以自动为您生成添加语句。
sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
user = "root"
password = "123456"
# Automatically generate sql statements based on database table names
generate_sink_sql = true
database = test
table = test_table
}
}


精确一次语义:
对于精确写入场景,我们保证精确一次性。
sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"

max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"

is_exactly_once = "true"

xa_data_source_class_name = "com.vertical.cj.jdbc.VerticalXADataSource"
}
}


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

精彩推荐




Apache SeaTunnel Connector 使用文档和使用案例有奖征稿来了!一起玩开源




JDBC Vertica Source Connector 使用文档




讲师征集令 | Apache SeaTunnel Meetup 分享嘉宾火热招募中!



一键三连-点赞在看转发⭐️!


文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论