引言
上一篇我们用命令行提交任务的方式演示了如何快速同步 MySQL 库表结构至 Doris,并利用自带的 JDBC-Catalog 能力将 OLTP 库数据同步至 Doris 中。
但对于一些经常做常规开发的同学而言,一个 POC 测试的方案还不足以满足业务日常开发的完整诉求。
这类完整诉求包含了功能完整度、运维难度、开发难度、管理难度等一系列考虑点在内。
故此本篇我们一起来探讨学习如何使用 Dinky 这款开源Flink任务管理平台来完成 OLTP 库至 Apache Doris 的任务构建的。
话不多说,开搞!
Dinky 简介
Dinky (https://www.dinky.org.cn/) 是一个以 Apache Flink 为基础、开箱即用的一站式实时计算平台,连接数据湖仓等众多框架,致力于流批一体和湖仓一体的建设与实践。

同时具备以下几方面的核心特性:

有兴趣的同学可至官网翻看更详细的文档内容,这里不做更多赘述。
部署 Dinky 环境
安装 Dinky
Dinky 部署版本:1.16-1.2.0【前者是 Flink 版本号,后者是 Dinky 版本号】
Dinky 需要具备内置的 Flink 环境,将相关依赖放置在指定目录下
本篇的演示 DEMO 依旧以 MySQL -> Doris 为例
下载安装包
# 下载 Dinky 二进制包
https://github.com/DataLinkDC/dinky/releases/download/v1.2.0/dinky-release-1.16-1.2.0.tar.gz
# 解压缩安装包
tar -zxvf dinky-release-1.16-1.2.0.tar.gz
# 重命名 Dinky 目录
mv dinky-release-1.16-1.2.0 dinky-1.2.0
# 进入根目录
cd dinky-1.20

MySQL 配置及 Conf 文件修改【可选】
本篇为缩减篇幅不显臃肿,采用默认 h2 数据库作为元数据管理组件
若要生产使用,请使用 MySQL 或 PostgreSQL 作为元数据管理数据库
使用 h2 数据库无法提交 Flink yarn application 任务
【重要】h2 数据库当前默认不提供持久化,所以一旦重启,所有元数据将丢失!(每次重启,都是崭新的人生!)
若有生产配置需求,请访问 Dinky 官网依据部署教程修改和配置相应参数,本篇不做赘述。
开放端口
Dinky 的 WebUI 端口默认为 8888,可在 Conf 文件进行修改。
整库同步方案
Dinky 平台有多种整库同步功能,无论是 Pipeline 模式还是 Doris-Connnector 实现的整库同步方案,均可完成本篇的任务目的,故此我们将同步采用这两种模式做以演示,供各位看官老爷自行选择。
Doris-Connector 整库同步
使用 Doris-Connector 做整库同步,可不依赖 Flink 进程,只需要三个 Jar 包即可完成 Standalone 模式的整库同步任务构建。
同时使用 Doris-Connector 构建时,若只需同步库表结构,不需要构建CDC任务时,则可以不考虑非主键模式下的库表同步问题,是整库同步库表结构的一大利器!
添加 lib 依赖
依赖添加的方式有两种:
• 在Dinky-配置中心,设置Resource的目录地址,将三个依赖包移至该目录下 
然后同步目录结构刷新资源 
• 在Dinky-注册中心-资源栏目,通过客户端上传三个依赖包到服务端 
需要放置的 Jar 包有如下几个:
• Flink-Doris-Connector • Flink-SQL-Connector-MySQL-CDC • MySQL-JDBC-Driver
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar
构建 Jar 任务
• 创建作业 
• 选择 Jar 任务 
• 填写任务信息,按引导框填写内容构建任务,如下图: 
任务构建内容如下:
# 程序路径
rs:/flink-doris-connector-1.16-24.1.0.jar
# 程序运行类
org.apache.doris.flink.tools.cdc.CdcTools
# 程序运行参数
mysql-sync-database
--database doris_database_name
--mysql-conf hostname=127.0.0.1
--mysql-conf port=3306
--mysql-conf username=admin
--mysql-conf password=123456
--mysql-conf database-name=ssb
--sink-conf fenodes=127.0.0.1:8030
--sink-conf username=root
--sink-conf password=123456
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030
--sink-conf sink.label-prefix=label
--table-conf replication_num=1
--create-table-only=true
请注意,最后一个配置项 --create-table-only=true
意为只同步表结构,不做CDC数据同步。
• 若需要构建完表结构后继续做CDC数据同步 • 左边窗口填写 Checkpoint 的 SET 语句
# checkpoint设置,不写会出现数据无法写入情况
SET 'execution.checkpointing.interval' = '10s';

• 如果 MySQL 表有非主键表,则需按表同步需求设置同步 Key 字段
# 同步的库表中含有非主键表时
# 必须设置 scan.incremental.snapshot.chunk.key-column 参数
# 且只能选择非空类型的一个字段。
# 不同的库表列之间用,隔开,例如:
scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...
# 完整在程序运行参数中的格式为
--mysql-conf scan.incremental.snapshot.chunk.key-column=database.table:column
• 运行任务查看执行状态 
• 可以在运维中心查看任务状态 
• Doris WebUI 查看库表是否正确同步 
Dinky Pipeline 整库同步
添加 Dinky 依赖
我们需要放置的 Jar 依赖包有如下四个:
• Flink-CDC-3.2.1-BIN • Flink-CDC-Pipeline-Connector-MySQL-3.2.1 • Flink-CDC-Pipeline-Connector-Doris-3.2.1 • MySQL-JDBC-Driver-8.0.28
# 进入依赖目录
cd extends/flink1.16
# Flink CDC
wget https://archive.apache.org/dist/flink/flink-cdc-3.2.1/flink-cdc-3.2.1-bin.tar.gz
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.2.1/flink-cdc-pipeline-connector-mysql-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.2.1/flink-cdc-pipeline-connector-doris-3.2.1.jar
处理冲突依赖
• 处理 Dist 依赖
如果直接在 Dinky 使用 flink-cdc-dist-3.2.1.jar
会有 java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList; 错误,需预处理一遍
# 解压 flink-cdc-3.2.1-bin.tar.gz
tar -zxvf flink-cdc-3.2.1-bin.tar.gz
cd flink-cdc-3.2.1/lib/
# 解压jar文件·
jar -xvf flink-cdc-dist-3.2.1.jar
# 删除冲突包
rm -rf org/apache/calcite
# 删除原 Jar 包
rm -rf flink-cdc-dist-3.2.1.jar
# 重新打包
jar -cvf flink-cdc-dist-3.2.1.jar *
# 移动 Jar 包
mv flink-cdc-dist-3.2.1.jar ../../
• 处理 table-planner 依赖
# 进入依赖目录
cd extends/flink1.16/
# Flink
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
• 调整依赖
# 解压缩
tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
# 移动 lib
mv flink-1.16.3/lib/* ./
# 移动 table-planner 依赖
mv flink-1.16.3/opt/flink-table-planner_2.12-1.16.3.jar ./
# 删除解压缩目录
rm -rf flink-1.16.3
# 删除冲突 table-planner 依赖
rm -rf flink-table-planner-loader-1.16.3.jar
• 清理多余内容
rm -rf flink-cdc-3.2.1/ flink-cdc-3.2.1-bin.tar.gz flink-1.16.3-bin-scala_2.12.tgz
启动 Dinky
bash bin/auto.sh start


Flink Standalone 启动
请注意,如果需要除本地机器外访问 8081
端口,请在 conf/flink-conf.yaml
文件中调整 rest.address
、rest.bind-address
为 0.0.0.0
。
# 进入指定目录
cd /opt/software/
# 下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
# 解压缩
tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
cd flink-1.16.3/
# 拷贝 Dinky-app Jar 包至 flink/lib 目录下
cp ../dinky-1.2.0/jar/dinky-app-1.16-1.2.0-jar-with-dependencies.jar ./lib/
# 拷贝在 Dinky 调整好的 dist、table-planner 依赖替换 flink/lib 目录下依赖
cp ../dinky-1.2.0/extends/flink1.16/flink-dist-1.16.3.jar./lib/
cp ../dinky-1.2.0/extends/flink1.16/flink-table-planner_2.12-1.16.3.jar./lib/
rm -rf ./lib/flink-table-planner-loader-1.16.3.jar
# 拷贝 MySQL-JDBC-Driver
cp ../dinky-1.2.0/extends/flink1.16/mysql-connector-java-8.0.28.jar./lib/
# 启动 Flink
./bin/start_cluster.sh

Dinky 注册 Flink 实例

配置任务
我们以同步一张非主键的 MySQL 表 customer
为例,选择默认 Catalog 和 Flink-Standalone
资源:

SET 'execution.checkpointing.interval' = '10s';
EXECUTE PIPELINE WITHYAML (
source:
type: mysql
scan.incremental.snapshot.chunk.key-column: ssb_test.customer:c_name
hostname: localhost
port: 3306
username: admin
password: 'Syj123456'
tables: ssb_test.customer
server-id: 5400-5404
sink:
type: doris
fenodes: 192.168.31.88:8030
username: root
password: 'Doris2024'
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: SyncMySQLDatabasetoDoris
parallelism: 1
)
运行任务

运维中心观察任务状态

校验数据
MySQL 端:

Doris 端:

使用建议
最后,给准备深度使用的同学给一个建议,如果要添加诸如 MySQL、PG、Oracle 等 CDC Pipeline 等依赖包时,建议使用 Resource
资源管理器和 ADD JAR
或 ADD CUSTOMJAR
语法来按任务添加相关依赖,一方面可以做到依赖冲突的隔离,另一方面可以按需加载,无需每次重启 Dinky 服务。
小节
这一套折腾的确累人,本来预想两天搞完,没想到搞了小一周,一方面时间有限,只能下班后加班搞搞,另一方面对 Dinky 的确不熟悉,依赖冲突处理花费了很多时间。
同时为了满足每一个希望照着可以完全复现的同学都可以成功部署,这个方案的环境是铲了部署,部署后再铲,前前后后从零部署环境到实验,大概做了五次,直到后续三次依照文档可完全满足照搬即可的既定目标后,我才放心定稿。
在这里特别鸣谢 Dinky 社区的老朱、凯哥、高哥、洲哥四位巨佬的鼎力相助,还有老朱的小号也得鸣谢一下😄,四位大佬在这篇文章助我很多,也希望 Doris 和 Dinky 两社区继续长长久久~
同时如果想加入 补习班、Doris、PowerData、Dinky 这三个官方社区和一个本号社区的任意一个社群,都可以加我微信:fl_manyi
注明来意,看到速拉!
都看到这里了,不得来个点赞和在看鼓励一下呀!
下期见~




