前言 环境要求 所需依赖 脚本准备 功能实践 总结
GitHub 地址 
一、前言
Flink Catalog 持久化是 Dinky 实践系列的第二篇,通过阅读本文,您将会熟悉 Dinky MySQL Catalog 持久化的用法。这个系列中,我们以 MySQL 做为 Source 端,StarRocks 做为 Sink 端做为演示。
在 Dinky 0.6.5 之前,在编写 FlinkSQL 作业时,FlinkSQL 的 DDL 语句可以采用 FlinkSQLEnv 环境引入。但这种方式对大量表结构进行初始化管理时存在局限性,为提供统一的 Flink 元数据管理能力,Dinky 在 0.6.6 实现了 Flink MySQL Catalog 功能,此功能与 Hive Catalog 相似,相比之前大大降低了表结构的维护成本。
二、环境要求
| 软件 | 版本 |
| CDH | 6.2.0 |
| Hadoop | 3.0.0-cdh6.2.0 |
| Flink | 1.13.6 |
| Flink CDC | 2.2.1 |
| StarRocks | 2.2.0 |
| Dinky | 0.6.6 |
| MYSQL | 5.7 |
三、所需依赖
依赖
Mysql Catalog 持久化需要在 Flink下加载周围组件所需要的 Flink connector 即可。依赖如下:
# hadoop依赖flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar# Flink Starrrocks依赖flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar# Dinky hadoop依赖flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar# Dinky mysql catalog依赖dlink-catalog-mysql-1.13-0.6.6.jar# flink cdc依赖包flink-sql-connector-mysql-cdc-2.2.1.jar# mysql 驱动依赖mysql-connector-java-8.0.21.jar
说明
1.hadoop 依赖包放置 $FLINK_HOME/lib下
2.Flink StarRocks 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
3.Dinky hadoop 依赖包放置 $DINKY_HOME/plugins 下(网盘或者群公告下载)
4.Dinky MySQL Catalog 依赖放置 $FLINK_HOME/lib 下
5.Flink cdc 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
6.MySQL 驱动依赖放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下
四、脚本准备
MySQL 建表
如下 sql 脚本采用 Flink CDC 官网。
# mysql建表语句(同步到Starocks)CREATE TABLE bigdata.products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));ALTER TABLE bigdata.products AUTO_INCREMENT = 101;INSERT INTO bigdata.productsVALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");CREATE TABLE bigdata.orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed) AUTO_INCREMENT = 10001;INSERT INTO bigdata.ordersVALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
表解析
MySQL Catalog 持久化目前默认的 catalog 为 my_catalog,默认的 FlinkSQLEnv 为 DefaultCatalog。目前存储 MySQL Catalog 元数据的表结构如下:
| 元数据表 | 表含义 |
| metadata_database | 元数据 schema 信息 |
| metadata_table | 元数据table信息 |
| metadata_database_property | schema 属性信息 |
| metadata_table_property | table 属性信息 |
| metadata_column | 数据列信息 |
| metadata_function | UDF 信息 |
五、功能实践
作业脚本准备
DROP TABLE IF EXISTS ods_orders_src;CREATE TABLE IF NOT EXISTS ods_orders_src (`order_id` int COMMENT '', `order_date` timestamp(3) COMMENT '', `customer_name` string COMMENT '', `price` decimal(12,2) COMMENT '', `product_id` int COMMENT '', `order_status` tinyint COMMENT '',PRIMARY KEY(order_id) NOT ENFORCED) COMMENT ''WITH ('connector' = 'mysql-cdc','hostname' = '192.168.0.40','port' = '4406','username' = 'root','password' = 'Percona@020*','server-time-zone' = 'Asia/Shanghai','scan.incremental.snapshot.enabled' = 'true','scan.startup.mode'='initial','scan.incremental.snapshot.chunk.size' = '20000','heartbeat.interval' = '120s','database-name' = 'bigdata','table-name' = 'orders');DROP TABLE IF EXISTS ods_orders_sink;CREATE TABLE IF NOT EXISTS ods_orders_sink (`order_id` int COMMENT '', `order_date` timestamp(3) COMMENT '', `customer_name` string COMMENT '', `price` decimal(12,2) COMMENT '', `product_id` int COMMENT '', `order_status` tinyint COMMENT '',PRIMARY KEY(order_id) NOT ENFORCED) COMMENT ''WITH ('jdbc-url' = 'jdbc:mysql://192.168.0.5:19035','connector' = 'starrocks','database-name' = 'qhc_ods','table-name' = 'ods_orders','password' = '123456','load-url' = '192.168.0.5:18035','username' = 'devuser','sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true','sink.max-retries' = '10','sink.buffer-flush.interval-ms' = '15000','sink.parallelism' = '1');DROP TABLE IF EXISTS ods_products_src;CREATE TABLE IF NOT EXISTS ods_products_src (`id` int COMMENT '', `name` string COMMENT '', `description` string COMMENT '',PRIMARY KEY(id ) NOT ENFORCED) COMMENT ''WITH ('connector' = 'mysql-cdc','hostname' = '192.168.0.4','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','scan.incremental.snapshot.enabled' = 'true','scan.startup.mode'='initial','scan.incremental.snapshot.chunk.size' = '20000','heartbeat.interval' = '120s','database-name' = 'bigdata','table-name' = 'products');DROP TABLE IF EXISTS ods_products_sink;CREATE TABLE IF NOT EXISTS ods_products_sink (`id` int COMMENT '', `name` string COMMENT '', `description` string COMMENT '',PRIMARY KEY(id ) NOT ENFORCED) COMMENT ''WITH ('jdbc-url' = 'jdbc:mysql://192.168.0.5:19035','connector' = 'starrocks','database-name' = 'qhc_ods','table-name' = 'ods_products','password' = '123456','load-url' = '192.168.0.5:18035','username' = 'devuser','sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true','sink.max-retries' = '10','sink.buffer-flush.interval-ms' = '15000','sink.parallelism' = '1');
创建初始化脚本作业
创建一个 ddl_init 作业,通过 yarn session模式提交,FlinkSQLEnv采用DefaultCatalog,作业如下:

执行作业后,在 dinky 元数据库查询是否表已经存在。
查看元数据表
每执行一次初始化DDL,将会覆盖之前的元数据。


查询 Source 的数据
新建一个作业 ods_order_src。
select * from ods_orders_src;

由此可以看到,对于所创建的表其实已经存在与 DefaultCatalog,即保存与 Mysql 元数据中。此时,可以通过创建任意作业去使用DefaultCatalog 中的表。

插入 Sink 表
还是在 ods_order_src 作业中,使用 insert 语句。将数据写入 StarRocks 中。
INSERT INTO ods_orders_sinkselect * from ods_orders_src;
提交 Flink 作业

查看 StarRocks 数据

说明
对于 MySQL Catalog 除上面用默认的 DefaultCatalog,那么也可以通过 create 创建 catalog,然后在对应数据库下执行 dlinkmysqlcatalog.sql。语法如下:
create catalog my_catalog with('type' = 'dlink_mysql','username' = 'dlink','password' = 'dlink','url' = 'jdbc:mysql://192.168.0.4:3306/dlink2?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true');use catalog my_catalog;
六、总结
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉社区群(推荐):



扫描二维码获取
更多精彩
DataLink
数据中台





