前言 环境要求 Flink ClickHouse 连接器编译 所需依赖 脚本准备 Hive Catalog 作业 Dinky MySQL Catalog 作业 总结
GitHub 地址 
一、前言
二、环境要求
| 软件 | 版本 |
| CDH | 6.2.0 |
| Hadoop | 3.0.0-cdh6.2.0 |
| Hive | 2.1.1-cdh6.2.0 |
| Flink | 1.13.6 |
| Flink CDC | 2.2.1 |
| Dinky | 0.6.6 |
| MySQL | 5.7 |
| ClickHouse | 22.2.2.1(单机版) |
三、Flink ClickHouse 连接器编译
下载 Flink ClickHouse
git clone https://github.com/itinycheng/flink-connector-clickhouse.gitcd flink-connector-clickhouse/git checkout -b release-1.13 origin/release-1.13
IDEA 编译 Flink ClickHouse
修改pom。
<version>1.13.2-SNAPSHOT</version>修改为<version>1.13.6</version><flink.version>1.13.2</flink.version>修改为<flink.version>1.13.6</flink.version>#scala版本根据自身情况修改<scala.binary.version>2.11</scala.binary.version>修改为<scala.binary.version>2.12</scala.binary.version>
修改完成后,即可进行编译。

编译完成后,jar包如下:

四、所需依赖
#hive依赖包antlr-runtime-3.5.2.jarhive-exec-2.1.1-cdh6.2.0.jarlibfb303-0.9.3.jarflink-sql-connector-hive-2.2.0_2.12-1.13.6.jarhive-site.xml# hadoop依赖flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar# Dinky mysql catalog依赖dlink-catalog-mysql-1.13-0.6.6-SNAPSHOT.jar# Dinky hadoop依赖flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar# mysql 驱动依赖mysql-connector-java-8.0.21.jar# clickhouse 依赖clickhouse-jdbc-0.2.6.jarflink-connector-clickhouse-1.13.6.jar# flink cdc依赖包flink-sql-connector-mysql-cdc-2.2.1.jar
说明
1.hive 依赖包放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下
2.hadoop 依赖包放置 $FLINK_HOME/lib下
3.mysql 驱动依赖放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下
4.clickhouse 依赖放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下
5.Dinky mysql catalog依赖放置 $FLINK_HOME/lib下
6.flink cdc依赖包放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下
7.Dinky hadoop依赖包放置$DINKY_HOME/plugins下(网盘或者群公告下载)
以上依赖放入后,重启 Flink 集群和 Dinky。
五、脚本准备
MySQL 建表语句
# mysql建表语句CREATE TABLE bigdata.products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));ALTER TABLE bigdata.products AUTO_INCREMENT = 101;INSERT INTO bigdata.productsVALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");
ClickHouse 建表语句
drop table test.test_orders;CREATE TABLE test.test_orders (order_id Int64 NOT NULL ,order_date DATETIME NOT NULL,customer_name String NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id Int64 NOT NULL,order_status BOOLEAN NOT NULL -- Whether order has been placed)ENGINE = MergeTree()ORDER BY order_idPRIMARY KEY order_id;
六、Hive Catalog 作业
创建作业脚本
SET table.local-time-zone = Asia/Shanghai;SET execution.runtime-mode = streaming;SET execution.checkpointing.interval = 60000;SET execution.checkpointing.tolerable-failed-checkpoints = 10;SET execution.checkpointing.timeout =10000;SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;SET execution.checkpointing.mode = EXACTLY_ONCE;SET execution.checkpointing.unaligned = true;SET restart-strategy = fixed-delay;SET restart-strategy.fixed-delay.attempts = 5 ;SET restart-strategy.fixed-delay.delay = 30s;SET table.exec.source.cdc-events-duplicate = true;SET table.sql-dialect = default;SET pipeline.name = hive_catalog_cdc_orders;SET jobmanager.memory.process.size = 1600m;SET taskmanager.memory.process.size = 1780m;SET taskmanager.memory.managed.size = 512m;SET taskmanager.numberOfTaskSlots=2;SET yarn.application.queue= root.users.flink;LOAD MODULE hive WITH ('hive-version' = '2.1.1');CREATE CATALOG qhc_ods_catalog WITH ('type' = 'hive','default-database' = 'default','hive-version' = '2.1.1','hive-conf-dir' = '/etc/hive/conf','hadoop-conf-dir' = '/etc/hadoop/conf');DROP TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src;CREATE TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src (`order_id` int COMMENT '', `order_date` timestamp(3) COMMENT '', `customer_name` string COMMENT '', `price` decimal(12,2) COMMENT '', `product_id` int COMMENT '', `order_status` tinyint COMMENT '',PRIMARY KEY(order_id) NOT ENFORCED) COMMENT ''WITH ('connector' = 'mysql-cdc','hostname' = '192.168.0.4','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','scan.incremental.snapshot.enabled' = 'true','scan.startup.mode'='initial','scan.incremental.snapshot.chunk.size' = '20000','heartbeat.interval' = '120s','database-name' = 'bigdata','table-name' = 'orders');DROP TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink;CREATE TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink (`order_id` BIGINT COMMENT '', `order_date` timestamp(3) COMMENT '', `customer_name` string COMMENT '', `price` decimal(12,5) COMMENT '', `product_id` BIGINT COMMENT '', `order_status` tinyint COMMENT '',PRIMARY KEY(order_id) NOT ENFORCED) COMMENT ''WITH ('connector' = 'clickhouse','url' = 'clickhouse://192.168.0.5:8123','username' = 'default','password' = '123456','database-name' = 'test','table-name' = 'test_orders','sink.batch-size' = '500','sink.flush-interval' = '1000','sink.max-retries' = '3');INSERT INTO qhc_ods_catalog.qhc_ods.ods_orders_sinkSELECT * FROM qhc_ods_catalog.qhc_ods.ods_orders_src;

提交 Flink 作业
如下,Flink 任务正常运行。

查看 ClickHouse

七、Dinky MySQL Catalog 作业
创建作业脚本
DROP TABLE IF EXISTS ods_orders_src;CREATE TABLE IF NOT EXISTS ods_orders_src (`order_id` int COMMENT '', `order_date` timestamp(3) COMMENT '', `customer_name` string COMMENT '', `price` decimal(12,2) COMMENT '', `product_id` int COMMENT '', `order_status` tinyint COMMENT '',PRIMARY KEY(order_id) NOT ENFORCED) COMMENT ''WITH ('connector' = 'mysql-cdc','hostname' = '192.168.0.4','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','scan.incremental.snapshot.enabled' = 'true','scan.startup.mode'='initial','scan.incremental.snapshot.chunk.size' = '20000','heartbeat.interval' = '120s','database-name' = 'bigdata','table-name' = 'orders');DROP TABLE IF EXISTS ods_orders_sink;CREATE TABLE IF NOT EXISTS ods_orders_sink (`order_id` BIGINT COMMENT '', `order_date` timestamp(3) COMMENT '', `customer_name` string COMMENT '', `price` decimal(12,5) COMMENT '', `product_id` BIGINT COMMENT '', `order_status` tinyint COMMENT '',PRIMARY KEY(order_id) NOT ENFORCED) COMMENT ''WITH ('connector' = 'clickhouse','url' = 'clickhouse://192.168.0.5:8123','username' = 'default','password' = '123456','database-name' = 'test','table-name' = 'test_orders','sink.batch-size' = '500','sink.flush-interval' = '1000','sink.max-retries' = '3');
创建初始化作业脚本

执行作业后,在 dinky 元数据库查询是否表已经存在。
查看元数据表
每执行一次初始化DDL,将会更新 Flink 的元数据。通过左侧的结构可以看到catalog的表、view、udf等信息。

提交 Flink 作业
在mysql_catalog_cdc_orders作业中,使用 insert 语句。将数据写入 ClickHouse 中。
SET table.local-time-zone = Asia/Shanghai;SET execution.runtime-mode = streaming;SET execution.checkpointing.interval = 60000;SET execution.checkpointing.tolerable-failed-checkpoints = 10;SET execution.checkpointing.timeout =10000;SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;SET execution.checkpointing.mode = EXACTLY_ONCE;SET execution.checkpointing.unaligned = true;SET restart-strategy = fixed-delay;SET restart-strategy.fixed-delay.attempts = 5 ;SET restart-strategy.fixed-delay.delay = 30s;SET table.exec.source.cdc-events-duplicate = true;SET table.sql-dialect = default;SET pipeline.name = mysql_catalog_cdc_orders;SET jobmanager.memory.process.size = 1600m;SET taskmanager.memory.process.size = 1780m;SET taskmanager.memory.managed.size = 512m;SET taskmanager.numberOfTaskSlots=2;SET yarn.application.queue= root.users.flink;INSERT INTO ods_orders_sinkSELECT * FROM ods_orders_src;

源库插入数据
INSERT INTO bigdata.ordersVALUES (default, '2020-07-30 12:12:30', 'lucy', 25.25, 10000, true);
查看 ClickHouse

八、总结
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉社区群(推荐):



扫描二维码获取
更多精彩
DataLink
数据中台





