暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SeaTunnel引擎下的SQL Server CDC解决方案:构建高效数据管道

SeaTunnel 2023-11-29
979

在快速发展的数据驱动时代,实时数据处理已经成为企业决策和运营的关键因素。特别是在处理来自各种数据源的信息时,如何确保数据的及时、准确和高效同步变得尤为重要。

本文着重介绍了如何利用 SqlServer CDC 源连接器在 SeaTunnel 框架下实现 SQL Server 到其他数据系统的实时数据同步,这对于希望提升数据处理能力和实时数据分析的企业来说,具有重要的实践意义。

SQL Server CDC

SqlServer CDC 源连接器

支持 SQL Server 版本

  • • 服务器:2019(或更高版本,仅供参考)

支持引擎

SeaTunnel Zeta
Flink

主要特性

  • •  批处理

  • •  流处理

  • •  精确一次

  • •  列投影

  • •  并行处理

  • •  支持用户自定义分片

描述

SqlServer CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 SqlServer CDC 连接器以在 SqlServer 数据库上运行 SQL 查询。

支持的数据源信息

数据源支持的版本驱动URLMaven
SqlServer
  • 服务器:2019(或更高版本,仅供参考)

com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433;databaseName=column_type_test下载

安装 Jdbc 驱动

请下载并将 SqlServer 驱动放在 ${SEATUNNEL_HOME}/lib/
 目录下。例如:cp mssql-jdbc-xxx.jar ${SEATUNNEL_HOME}/lib/

数据类型映射

SQL Server 数据类型SeaTunnel 数据类型
CHAR
VARCHAR
NCHAR
NVARCHAR
STRUCT
CLOB
LONGVARCHAR
LONGNVARCHAR
STRING
BLOBBYTES
INTEGERINT
SMALLINT
TINYINT
SMALLINT
BIGINTBIGINT
FLOAT
REAL
FLOAT
DOUBLEDOUBLE
NUMERIC
DECIMAL(column.length(), column.scale().orElse(0))
DECIMAL(column.length(), column.scale().orElse(0))
TIMESTAMPTIMESTAMP
DATEDATE
TIMETIME
BOOLEAN
BIT
BOOLEAN

源选项

名称类型必需默认值描述
username字符串-连接数据库服务器时使用的用户名。
password字符串-连接数据库服务器时使用的密码。
database-names列表-需要监控的数据库名。
table-names列表-表名为模式名和表名的组合(databaseName.schemaName.tableName)。
base-url字符串-必须包含数据库的URL,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。
startup.mode枚举INITIALSqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和 "specific"。
startup.timestamp长整型-从指定的纪元时间戳(以毫秒为单位)开始。
注意,当使用 "startup.mode" 选项为 'timestamp' 时,此选项是必需的。
startup.specific-offset.file字符串-从指定的 binlog 文件名开始。
注意,当 "startup.mode" 选项使用 'specific'
 时,此选项是必需的。
startup.specific-offset.pos长整型-从指定的 binlog 文件位置开始。
注意,当 "startup.mode" 选项使用 'specific'
 时,此选项是必需的。
stop.mode枚举NEVERSqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。
stop.timestamp长整型-从指定的纪元时间戳(以毫秒为单位)停止。
注意,当 "stop.mode" 选项使用 'timestamp'
 时,此选项是必需的。
stop.specific-offset.file字符串-从指定的 binlog 文件名停止。
注意,当 "stop.mode" 选项使用 'specific'
 时,此选项是必需的。
stop.specific-offset.pos长整型-从指定的 binlog 文件位置停止。
注意,当 "stop.mode" 选项使用 'specific'
 时,此选项是必需的。
incremental.parallelism整型1增量阶段中并行读取器的数量。
snapshot.split.size整型8096表快照的分割大小(行数),快照期间的表会被分割成多个分片进行读取。
snapshot.fetch.size整型1024读取表快照时每次轮询的最大提取量。
server-time-zone字符串UTC数据库服务器中的会话时区。
connect.timeout时长30s连接器尝试连接到数据库服务器后等待超时的最大时间。
connect.max-retries整型3连接器尝试建立数据库服务器连接的最大重试次数。
connection.pool.size整型20连接池大小。
chunk-key.even-distribution.factor.upper-bound双精度浮点型100分块键分布因子的上界。此因子用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于此上界值(即 (MAX(id) - MIN(id) + 1) 行数),则表分块将被优化为均匀分布。否则,如果分布因子更大,则表将被认为是不均匀分布的,并且如果估计的分片数超过 sample-sharding.threshold
 指定的值,将使用基于抽样的分片策略。默认值为 100.0。
chunk-key.even-distribution.factor.lower-bound双精度浮点型0.05分块键分布因子的下界。此因子用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于此下界值(即 (MAX(id) - MIN(id) + 1) 行数),则表分块将被优化为均匀分布。否则,如果分布因子更小,则表将被认为是不均匀分布的,并且如果估计的分片数超过 sample-sharding.threshold
 指定的值,将使用基于抽样的分片策略。默认值为 0.05。
sample-sharding.threshold整型1000触发抽样分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-bound
 和 chunk-key.even-distribution.factor.lower-bound
 指定的范围,并且估计的分片数(计算为近似行数 分块大小)超过此阈值时,将使用抽样分片策略。这可以帮助更有效地处理大型数据集。默认值为1000分片。
inverse-sampling.rate整型1000抽样分片策略中使用的抽样率的倒数。例如,如果这个值设置为1000,意味着抽样过程中应用了1/1000的抽样率。这个选项提供了在控制抽样粒度的灵活性,从而影响最终的分片数量。特别是在处理非常大的数据集时,更低的抽样率是首选。默认值为1000。
exactly_once布尔型true启用精确一次语义。
debezium.*配置-将Debezium的属性传递给用于从SqlServer服务器捕获数据变化的Debezium嵌入式引擎。
查看Debezium的SqlServer连接器属性获取更多信息
format枚举DEFAULTSqlServer CDC 的可选输出格式,有效枚举为 "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON"。
common-options
-源插件的通用参数,请参考源通用选项获取详细信息。

任务示例

初始读取简单示例

这是一个流模式CDC初始化读取的示例,成功读取表数据后将进行增量读取。以下SQL DDL仅供参考。

env {
  # 在此处设置引擎配置
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.checkpoint.interval = 5000
}

source {
  # 仅用于测试和演示功能的示例源插件
  SqlServer-CDC {
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="initial"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }
  

增量读取简单示例

这是一个增量阅读示例,用于阅读变更数据并打印。

env {
  # 在此处设置引擎配置
  execution.parallelism = 1
  job.mode = "STREAMING"
  execution.checkpoint.interval = 5000
}

source {
  # 仅用于测试和演示功能的示例源插件
  SqlServer-CDC {
    # 设置精确一次读取
    exactly_once=true 
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="latest"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }
}

随着数据处理需求的不断增长和实时数据同步的重要性日益凸显,SqlServer CDC 源连接器在 SeaTunnel 生态系统中扮演着至关重要的角色。

通过本文的深入解析,我们希望您能够更好地理解并利用这一强大工具,从而实现数据流的高效、稳定和精准同步。

无论您是数据工程师、系统架构师还是业务分析师,掌握如何在 SeaTunnel 中部署和优化 SQL Server CDC 连接器,都将为您的数据处理能力带来显著提升。

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南

从 0 到 1 快速入门 Apache SeaTunnel 

初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel


 MySQL 同步到 Hive / 从MySQL同步到StarRocks

通过 SeaTunnel 将数据写入 OSS-HDFS 

MySQL 到 Elasticsearch 实时同步解决方案


启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 

 部署 Apache SeaTunnel 分布式集群

Apache SeaTunnel Web部署指南

最佳实践

 OPPO 清风 天翼云 马蜂窝

孩子王 哔哩哔哩 唯品会


测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!

最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!

比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布

SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台



仓库地址: 

https://github.com/apache/seatunnel


网址:

https://seatunnel.apache.org/


Apache SeaTunnel 下载地址:

https://seatunnel.apache.org/download

 

衷心欢迎更多人加入!


我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!


我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!


提交问题和建议:

https://github.com/apache/seatunnel/issues


贡献代码:

https://github.com/apache/seatunnel/pulls


订阅社区开发邮件列表 : 

dev-subscribe@seatunnel.apache.org


开发邮件列表:

dev@seatunnel.apache.org


加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ


关注 Twitter: 

https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论