暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 Dinky 可视化构建 Apache Doris 数据开发同步任务

大数据技能圈 2024-12-27
153

引言

上一篇我们用命令行提交任务的方式演示了如何快速同步 MySQL 库表结构至 Doris,并利用自带的 JDBC-Catalog 能力将 OLTP 库数据同步至 Doris 中。

但对于一些经常做常规开发的同学而言,一个 POC 测试的方案还不足以满足业务日常开发的完整诉求。

这类完整诉求包含了功能完整度、运维难度、开发难度、管理难度等一系列考虑点在内。

故此本篇我们一起来探讨学习如何使用 Dinky 这款开源Flink任务管理平台来完成 OLTP 库至 Apache Doris 的任务构建的。

话不多说,开搞!

Dinky 简介

Dinky (https://www.dinky.org.cn/) 是一个以 Apache Flink 为基础、开箱即用的一站式实时计算平台,连接数据湖仓等众多框架,致力于流批一体和湖仓一体的建设与实践。

同时具备以下几方面的核心特性:

有兴趣的同学可至官网翻看更详细的文档内容,这里不做更多赘述。

部署 Dinky 环境

安装 Dinky

Dinky 部署版本:1.16-1.2.0【前者是 Flink 版本号,后者是 Dinky 版本号】

Dinky 需要具备内置的 Flink 环境,将相关依赖放置在指定目录下

本篇的演示 DEMO 依旧以 MySQL -> Doris 为例

下载安装包

下载 Dinky 二进制包
https://github.com/DataLinkDC/dinky/releases/download/v1.2.0/dinky-release-1.16-1.2.0.tar.gz
解压缩安装包
tar -zxvf dinky-release-1.16-1.2.0.tar.gz
重命名 Dinky 目录
mv dinky-release-1.16-1.2.0 dinky-1.2.0
进入根目录
cd dinky-1.20

MySQL 配置及 Conf 文件修改【可选】

本篇为缩减篇幅不显臃肿,采用默认 h2 数据库作为元数据管理组件

若要生产使用,请使用 MySQL 或 PostgreSQL 作为元数据管理数据库

使用 h2 数据库无法提交 Flink yarn application 任务

【重要】h2 数据库当前默认不提供持久化,所以一旦重启,所有元数据将丢失!(每次重启,都是崭新的人生!)

若有生产配置需求,请访问 Dinky 官网依据部署教程修改和配置相应参数,本篇不做赘述。

开放端口

Dinky 的 WebUI 端口默认为 8888,可在 Conf 文件进行修改。

整库同步方案

Dinky 平台有多种整库同步功能,无论是 Pipeline 模式还是 Doris-Connnector 实现的整库同步方案,均可完成本篇的任务目的,故此我们将同步采用这两种模式做以演示,供各位看官老爷自行选择。

Doris-Connector 整库同步

使用 Doris-Connector 做整库同步,可不依赖 Flink 进程,只需要三个 Jar 包即可完成 Standalone 模式的整库同步任务构建。

同时使用 Doris-Connector 构建时,若只需同步库表结构,不需要构建CDC任务时,则可以不考虑非主键模式下的库表同步问题,是整库同步库表结构的一大利器!

添加 lib 依赖

依赖添加的方式有两种:

  • • 在Dinky-配置中心,设置Resource的目录地址,将三个依赖包移至该目录下
  •   然后同步目录结构刷新资源
  • • 在Dinky-注册中心-资源栏目,通过客户端上传三个依赖包到服务端

需要放置的 Jar 包有如下几个:

  • • Flink-Doris-Connector
  • • Flink-SQL-Connector-MySQL-CDC
  • • MySQL-JDBC-Driver
MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar

构建 Jar 任务

  • • 创建作业
  • • 选择 Jar 任务
  • • 填写任务信息,按引导框填写内容构建任务,如下图:
    任务构建内容如下:
  程序路径
  rs:/flink-doris-connector-1.16-24.1.0.jar
  程序运行类
  org.apache.doris.flink.tools.cdc.CdcTools
  程序运行参数
  mysql-sync-database
--database doris_database_name
--mysql-conf hostname=127.0.0.1
--mysql-conf port=3306
--mysql-conf username=admin
--mysql-conf password=123456
--mysql-conf database-name=ssb
--sink-conf fenodes=127.0.0.1:8030
--sink-conf username=root
--sink-conf password=123456
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030
--sink-conf sink.label-prefix=label
--table-conf replication_num=1
--create-table-only=true

请注意,最后一个配置项 --create-table-only=true
 意为只同步表结构,不做CDC数据同步。

  • • 若需要构建完表结构后继续做CDC数据同步
  • • 左边窗口填写 Checkpoint 的 SET 语句
# checkpoint设置,不写会出现数据无法写入情况
SET 'execution.checkpointing.interval' = '10s';


  • • 如果 MySQL 表有非主键表,则需按表同步需求设置同步 Key 字段
# 同步的库表中含有非主键表时
# 必须设置 scan.incremental.snapshot.chunk.key-column 参数
# 且只能选择非空类型的一个字段。
# 不同的库表列之间用,隔开,例如:
scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...
# 完整在程序运行参数中的格式为
--mysql-conf scan.incremental.snapshot.chunk.key-column=database.table:column

  • • 运行任务查看执行状态
  • • 可以在运维中心查看任务状态
  • • Doris WebUI 查看库表是否正确同步

Dinky Pipeline 整库同步

添加 Dinky 依赖

我们需要放置的 Jar 依赖包有如下四个:

  • • Flink-CDC-3.2.1-BIN
  • • Flink-CDC-Pipeline-Connector-MySQL-3.2.1
  • • Flink-CDC-Pipeline-Connector-Doris-3.2.1
  • • MySQL-JDBC-Driver-8.0.28
进入依赖目录
cd extends/flink1.16
Flink CDC
wget https://archive.apache.org/dist/flink/flink-cdc-3.2.1/flink-cdc-3.2.1-bin.tar.gz
MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.2.1/flink-cdc-pipeline-connector-mysql-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
Doris
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.2.1/flink-cdc-pipeline-connector-doris-3.2.1.jar

处理冲突依赖

  • • 处理 Dist 依赖

如果直接在 Dinky 使用 flink-cdc-dist-3.2.1.jar
 会有 java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList; 错误,需预处理一遍

解压 flink-cdc-3.2.1-bin.tar.gz
tar -zxvf flink-cdc-3.2.1-bin.tar.gz       
cd flink-cdc-3.2.1/lib/
解压jar文件·
jar -xvf flink-cdc-dist-3.2.1.jar
删除冲突包
rm -rf org/apache/calcite
删除原 Jar 包
rm -rf flink-cdc-dist-3.2.1.jar
重新打包
jar -cvf flink-cdc-dist-3.2.1.jar * 
移动 Jar 包
mv flink-cdc-dist-3.2.1.jar ../../

  • • 处理 table-planner 依赖
进入依赖目录
cd extends/flink1.16/
Flink
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz

  • • 调整依赖
解压缩
tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
移动 lib
mv flink-1.16.3/lib/* ./
移动 table-planner 依赖
mv flink-1.16.3/opt/flink-table-planner_2.12-1.16.3.jar ./
删除解压缩目录
rm -rf flink-1.16.3
删除冲突 table-planner 依赖
rm -rf flink-table-planner-loader-1.16.3.jar

  • • 清理多余内容
rm -rf flink-cdc-3.2.1/ flink-cdc-3.2.1-bin.tar.gz flink-1.16.3-bin-scala_2.12.tgz 

启动 Dinky

bash bin/auto.sh start

Flink Standalone 启动

请注意,如果需要除本地机器外访问 8081
 端口,请在 conf/flink-conf.yaml
 文件中调整 rest.address
rest.bind-address
 为 0.0.0.0

进入指定目录
cd /opt/software/
下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
解压缩
tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
cd flink-1.16.3/
拷贝 Dinky-app Jar 包至 flink/lib 目录下
cp ../dinky-1.2.0/jar/dinky-app-1.16-1.2.0-jar-with-dependencies.jar ./lib/
拷贝在 Dinky 调整好的 dist、table-planner 依赖替换 flink/lib 目录下依赖
cp ../dinky-1.2.0/extends/flink1.16/flink-dist-1.16.3.jar./lib/
cp ../dinky-1.2.0/extends/flink1.16/flink-table-planner_2.12-1.16.3.jar./lib/
rm -rf ./lib/flink-table-planner-loader-1.16.3.jar
拷贝 MySQL-JDBC-Driver
cp ../dinky-1.2.0/extends/flink1.16/mysql-connector-java-8.0.28.jar./lib/
启动 Flink
./bin/start_cluster.sh

Dinky 注册 Flink 实例

配置任务

我们以同步一张非主键的 MySQL 表 customer
 为例,选择默认 Catalog 和 Flink-Standalone
 资源:

SET 'execution.checkpointing.interval' '10s';
EXECUTE PIPELINE WITHYAML (
source:
  typemysql
  scan.incremental.snapshot.chunk.key-columnssb_test.customer:c_name
  hostnamelocalhost
  port3306
  usernameadmin
  password'Syj123456'
  tablesssb_test.customer
  server-id5400-5404
sink:
  typedoris
  fenodes192.168.31.88:8030
  usernameroot
  password'Doris2024'
  table.create.properties.light_schema_changetrue
  table.create.properties.replication_num1
pipeline:
  nameSyncMySQLDatabasetoDoris
  parallelism1
)

运行任务

运维中心观察任务状态

校验数据

MySQL 端:

Doris 端:

使用建议

最后,给准备深度使用的同学给一个建议,如果要添加诸如 MySQL、PG、Oracle 等 CDC Pipeline 等依赖包时,建议使用 Resource
 资源管理器和 ADD JAR
 或 ADD CUSTOMJAR
 语法来按任务添加相关依赖,一方面可以做到依赖冲突的隔离,另一方面可以按需加载,无需每次重启 Dinky 服务。

小节

这一套折腾的确累人,本来预想两天搞完,没想到搞了小一周,一方面时间有限,只能下班后加班搞搞,另一方面对 Dinky 的确不熟悉,依赖冲突处理花费了很多时间。

同时为了满足每一个希望照着可以完全复现的同学都可以成功部署,这个方案的环境是铲了部署,部署后再铲,前前后后从零部署环境到实验,大概做了五次,直到后续三次依照文档可完全满足照搬即可的既定目标后,我才放心定稿。

在这里特别鸣谢 Dinky 社区的老朱、凯哥、高哥、洲哥四位巨佬的鼎力相助,还有老朱的小号也得鸣谢一下😄,四位大佬在这篇文章助我很多,也希望 Doris 和 Dinky 两社区继续长长久久~

同时如果想加入  补习班、Doris、PowerData、Dinky 这三个官方社区和一个本号社区的任意一个社群,都可以加我微信:fl_manyi

注明来意,看到速拉!

都看到这里了,不得来个点赞在看鼓励一下呀!

下期见~



文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论