暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

主流 OLTP 库表结构快速同步至 Apache Doris

93

引言

各位看官老爷,见字如面~

最近在给社区同学和商业化客户沟通交流时,发现很多同学初步接触或者学习时,遇到了不少入门时的卡点 —— 数据同步。

经常遇到想做 POC 测试,但是 Doris 数据库表的构建,对自己而言又是一个有门槛的学习过程。

同时堆积着自己的各种现有开发任务和其他工作的压力,很多时候虽然想玩一玩 Doris,或者想和已有的 OLTP 库对比一下查询上是否真的有很高的性能提升,骤然感觉鸭力山大,最后不得已而放弃……

为了防止大家勇于放弃,同时减少自我摸索的痛苦,本篇就来一起探讨一下,如何快速将业务 OLTP 库的库表结构,同步至 Apache Doris 中来。

话不多说,开搞!

适用场景

本次演示环境:内网环境,无防火墙和端口开放等干扰项。

适用源库:MySQL、Oracle、PostgreSQL、SQLServer、DB2、MongoDB

适用目标库:Apache Doris 社区版、SelectDB Doris 分发版

使用工具:单节点 Flink 以及 Flink 集群

命令执行:Shell 终端、Dbeaver、SelectDB-WebUI

方案说明

本方案借助 Flink 社区完善的 Source 生态,以及功能丰富强大的 Flink-Doris-Connector 组件来联合完成。

方案适用于以下几种应用情况:

  1. 1. 源库有大量的库表结构需要批量同步,需要一个快速的工具
  2. 2. 对 Doris 本身的库表结构构建熟悉程度低,希望先快速搭建一个可测试环境,在后续学习中逐步完善表结构
  3. 3. 对 Doris 本身的库表结构熟悉程度高,希望快速做完库表结构转换后,自行微调参数、类型以贴合自己业务达到最佳实践效果

本篇章不做数据实时同步演示,只做库表结构同步演示,若要做数据同步,可参考 Doris 官网 Flink-Doris-Connector 章节。

后文将给出使用 JDBC-Catalog 从外表 INSERT INTO SELECT 至内表的方式完成数据初始化同步案例。

MySQL 实操示例

Flink 组件快速部署

Flink 组件版本:1.16.3

在 Flink 官网 下载相应的压缩包至内网下任意一台节点部署即可

下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
解压缩
tar -zxvf flink-1.16.3-bin-scala_2.12.tgz

备注

  1. 1. 同步时需要在 $FLINK_HOME/lib
     目录下添加对应的 Flink CDC 依赖,可至 Maven 中央仓库 搜索下载,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar 等
  2. 2. Connector 24.0.0 之后依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 $FLINK_HOME/lib
     下增加相关的 JDBC 驱动。
进入 Flink 服务 lib 目录
cd flink-1.16.3/lib
下载相应 Flink CDC 依赖
MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar

MySQL 快速同步

MySQL Server 版本:8.0.40

MySQL JDBC 版本:8.0.28

Flink MySQL Connector 版本:3.2.1

Flink Doris Connector 版本:1.16-24.1.0

使用 SSB 库表初始化 MySQL

配置和执行 Flink JOB 任务

<FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-24.1.0.jar \
    mysql-sync-database \
    --database mysql_ssb \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf port=3306 \
    --mysql-conf username=admin \
    --mysql-conf password=123456 \
    --mysql-conf database-name=ssb \
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label \
    --create-table-only "true" \
    --table-conf replication_num=3

检查 Doris 同步情况

使用 mysql_ssb 数据库
use mysql_ssb;
查看 tables
show tables;


Flink 任务常见问题

  1. 1. Flink 任务执行失败,提示如下信息:
java.lang.NoClassDefFoundError: org/apache/flink/cdc/debezium/DebeziumDeserializationSchema
           at org.apache.doris.flink.tools.cdc.CdcTools.createMySQLSyncDatabase(CdcTools.java:83)
           at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:56)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:568)
           at 

请检查是否使用了 Flink-Doris-Connector 对应的 CDC jar 包,若使用了 24.0 以上的 Flink-Doris-Connector,请使用 3.1 以后的 SQL CDC Connector。

  1. 1. Flink 构建任务失败,提示如下信息:
   The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema.

该问题为诸如 --including-tables
 等 mysql
 端参数异常,无法捕捉到对应的表,请检查对应参数信息。

  1. 1. 整库同步时出现部分表名、字段名解析异常,无法正常同步:请使用 --excluding-tables
     参数来做排除,如使用中文字段或其他非法字符构建的表结构,可使用 |
     分隔多个表,并支持正则表达式。比如 --excluding-tables table1

INSERT 数据同步

使用 Doris 的 JDBC-Catalog 功能,可简单快捷的同步主流 OLTP 库的数据至 Doris 指定表中,我们接着使用上述环境完成构建。

环境配置

将 MySQL-JDBC 驱动包下载同步至所有 Doris 节点下的 jdbc_drivers
 目录中,该目录默认在 DORIS_HOME
 根目录下。

本案例环境为 1FE 3BE 集群

跳至 FE 节点
ssh fe-01
进入 DORIS_HOME 下的 jdbc_drivers 目录,案例中 DORIS_HOME 路径为 home/doris/
cd /home/doris/jdbc_drivers
下载 MySQL JDBC Driver,要求最好 8.0.31 及以后版本
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.3.0/mysql-connector-j-8.3.0.jar
SCP 分发至其他所有节点,以 BE-01 节点为例
scp root@fe-01:/home/doris/jdbc_drivers/mysql-connector-j-8.3.0.jar root@be-01:/home/doris/jdbc_drivers

构建 MySQL JDBC-Catalog

CREATE CATALOG mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="secret",
"jdbc_url"="jdbc:mysql://example.net:3306",
"driver_url"="mysql-connector-j-8.3.0.jar",
"driver_class"="com.mysql.cj.jdbc.Driver"
)

执行成功后可使用 switch mysql
 命令,或者在 SelectDB-WebUI 上观察到外表数据库的层级结构。

同步数据

MySQL 插入初始数据

在 MySQL 端我们往 customer
 表中插入 3 条数据作为示例:

INSERT INTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(0,'1','1','1','1','1','1','1');
INSERTINTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(1,'2','2','2','2','2','2','2');
INSERTINTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(2,'3','3','3','3','3','3','3');

执行同步语句

连接 Doris 客户端,执行外表写入内表命令:

INSERT INTO mysql_ssb.customer
SELECT * FROM mysql.ssb.customer;

可以看到数据已成功同步至 Doris 中。

注意事项

  1. 1. 经过测试,JDBC-Catalog 同步速度大致在 10-30MB/s,若需要进行大规模数据同步(TB级),可以使用其他方式,如 Datax、Spark、StreamLoader 等工具并发导入。
  2. 2. 查询时若未 USE database
     ,内表表名指定格式为 database_name.table_name
     ,外表表名指定格式为 catalog_name.database_name.table_name
     ,以上述案例为例,catalog_name
     为 mysql
    ,故此整体的查询语句的 From 后指定的表名限定名为 mysql.ssb.customer
    ,而内表表名为 mysql_ssb.customer
    此处易混淆,请仔细甄别,也可将 MySQL 外表 Catalog 名设置为其他易分辨的名称。

小结

按如上流程,MySQL 等主流 OLTP 库的库表结构同步工作将无需再进行大量的人工转义工作,基本的字段映射、表类型映射等工作,都可以使用该方案快速满足,在此特别感谢我迪哥 吴迪(JNSimba三年的倾情贡献!

当然使用该方案构建的库表结构,不一定是业务最优的使用结构,故此若发现查询阶段有速度慢或其他异常情况,还需根据业务诉求调整表结构来完成整体业务流程的优化工作。

这个方案就是抛转引用供大家快速 POC 和测试验证的方案,在合理规划后也可用于生产的数据迁移方案,具体如何落地还需各位架构师因地制宜,适配而行~

都看到这了,来个点赞和在看吧!这是最大的更新动力了!

下次一起看看 ELK 场景的 Doris 快速验证方案。

以上~


文章转载自Apache Doris 补习班,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论