暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

JDBC SQL Server Sink Connector: 一览与实践

SeaTunnel 2023-10-25
474

在大数据时代,数据的迁移和流动已经变得日益重要。为了使数据能够更加高效地从一个源流向另一个目标,我们需要可靠、高效和易于配置的工具。今天,我们将介绍 JDBC SQL Server Sink Connector,这是一个专为 SQL Server 设计的连接器,能够确保数据的精准、高效传输。

不仅如此,它还支持多种流处理引擎,例如 Spark、Flink 和 SeatTunnel Zeta。无论您是初学者还是有经验的开发者,本文都将为您提供关于如何最大限度地利用此连接器的深入见解。

JDBC SQL Server Sink Connector

支持 SQL Server 版本 

  • • 服务器:2008(或更高版本,仅供信息参考)

支持的引擎 

Spark
Flink
Seatunnel Zeta

主要特点 

  • • [x] 精准一次性

  • • [x] CDC(变更数据捕获)

使用 Xa 事务
 来确保 精准一次性
。因此,仅支持支持 Xa 事务
 的数据库的 精准一次性
。您可以设置 is_exactly_once=true
 来启用它。

 描述 

通过 JDBC 写入数据。支持批处理模式和流处理模式,支持并发写入,支持精准一次性语义(使用 XA 事务保证)。

支持的数据源信息 

数据源支持的版本驱动URLMaven
SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

 数据库依赖 

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录
例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

 数据类型映射 

SQL Server 数据类型Seatunnel 数据类型
BITBOOLEAN
TINYINT
SMALLINT
SHORT
INTEGERINT
BIGINTLONG
DECIMAL
NUMERIC
MONEY
SMALLMONEY
DECIMAL((指定列的指定列大小)+1,
(获取指定列的小数点右边的数字的数量。)))
REALFLOAT
FLOATDOUBLE
CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT
STRING
DATELOCAL_DATE
TIMELOCAL_TIME
DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET
LOCAL_DATE_TIME
TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN
尚不支持

 Sink 选项 

名称类型必需默认值描述
url字符串-JDBC 连接的 URL。例如:jdbc:sqlserver://localhost:1433;databaseName=mydatabase
driver字符串-用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
user字符串-连接实例的用户名
password字符串-连接实例的密码
query字符串-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...
query
 具有更高的优先级
database字符串-使用此 database
 和 table-name
 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query
 互斥,优先级更高。
table字符串-使用数据库和此表名自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query
 互斥,优先级更高。
primary_keys数组-此选项用于支持自动生成 SQL 时的 insert
delete
 和 update
 等操作。
support_upsert_by_query_primary_key_exist布尔false选择是否使用 INSERT SQL、UPDATE SQL 来处理基于查询主键是否存在的更新事件(INSERT、UPDATE_AFTER)。只有在数据库不支持 upsert 语法时才使用此配置。注意:此方法性能较低。
connection_check_timeout_sec整数30等待用于验证连接的数据库操作完成的秒数。
max_retries整数0重试提交失败(executeBatch)的次数。
batch_size整数1000用于批量写入的记录数量达到 batch_size
 或时间达到 checkpoint.interval
 时,数据将刷新到数据库。
is_exactly_once布尔false是否启用精准一次性语义,将使用 XA 事务。如果开启,需要设置 xa_data_source_class_name
generate_sink_sql布尔false基于要写入的数据库表生成 SQL 语句。
xa_data_source_class_name字符串-数据库驱动程序的 XA 数据源类名,例如,SQL Server 为 com.microsoft.sqlserver.jdbc.SQLServerXADataSource
,其他数据源请参考附录。
max_commit_attempts整数3事务提交失败的重试次数。
transaction_timeout_sec整数-1事务打开后的超时时间,默认值为 -1(永不超时)。请注意,设置超时可能会影响精准一次性语义。
auto_commit布尔true默认启用自动事务提交。
common-options
-Sink 插件通用参数,请参考 Sink Common Options 以获取详细信息。

提示 

如果未设置 partition_column
,则将以单一并发运行;如果设置了 partition_column
,则将根据任务的并发度执行并行操作。

 任务示例 

简单:

这是一个读取 Sql Server 数据并将其直接插入另一个表中的示例

env {
  # 您可以在此处设置引擎配置
  execution.parallelism = 10
}
source {
  # 这是一个示例源插件,仅用于测试和演示源插件的功能
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "select * from column_type_test.dbo.full_types_jdbc"
    # 并行分片读取字段
    partition_column = "id"
    # 片段数量
    partition_num = 10
  }
}

transform {

  # 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"

  }  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

CDC(Change Data Capture)事件

我们还支持 CDC 变更数据,此时需要配置数据库、表和主键。

Jdbc {
  source_table_name = "customers"
  driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
  url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  user = SA
  password = "Y.sa123456"
  generate_sink_sql = true
  database = "column_type_test"
  table = "dbo.full_types_sink"
  batch_size = 100
  primary_
keys = ["id"]
}

精确一次性 Sink

事务性写入可能会更慢,但对数据更准确


  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
    is_exactly_once = "true"

    xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource"

  }  # 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
  # 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc


新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南

从 0 到 1 快速入门 Apache SeaTunnel 

初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel


 MySQL 同步到 Hive / 从MySQL同步到StarRocks

通过 SeaTunnel 将数据写入 OSS-HDFS 

MySQL 到 Elasticsearch 实时同步解决方案


启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 

 部署 Apache SeaTunnel 分布式集群

 

最佳实践

 OPPO 清风 天翼云 马蜂窝

孩子王 哔哩哔哩 唯品会


测试报告

 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!

最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!

比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 

https://github.com/apache/seatunnel


网址:

https://seatunnel.apache.org/


Apache SeaTunnel 下载地址:

https://seatunnel.apache.org/download

 

衷心欢迎更多人加入!


我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!


我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!


提交问题和建议:

https://github.com/apache/seatunnel/issues


贡献代码:

https://github.com/apache/seatunnel/pulls


订阅社区开发邮件列表 : 

dev-subscribe@seatunnel.apache.org


开发邮件列表:

dev@seatunnel.apache.org


加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ


关注 Twitter: 

https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论