暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dlink 在 Flink-mysql-cdc 到 Doris 的实践

750
摘要:本文介绍了 Dlink 在 Flink-mysql-cdc 实时入库 Doris 实践分享。内容包括:
  1. 背景
  2. 准备
  3. 部署
  4. 数据表
  5. FlinkSQL
  6. 调试
  7. 总结
  8. 未来
  9. 交流


Tips:历史传送门
Dlink ?一款FlinkSQL交互式开发平台
Dlink On Yarn 三种 Flink 执行方式的实践
Dlink 在 Hive 的实践
Dlink-0.3.2 新功能 FlinkSQL 自动补全
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/dlink
欢迎大家关注和 Star ~


一、背景

Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。

实时即未来,本文将分享如何基于 Dlink 实现 Mysql 变动数据通过 Flink 实时入库 Doris。


二、准备

        老规矩,列清各组件版本:
组件
版本
Flink
1.13.3
Flink-mysql-cdc
2.1.0
Doris
0.15.1-rc09
doris-flink
1.0-SNAPSHOT
Mysql
8.0.13
Dlink
0.4.0
        需要注意的是,本文的 Doris 是基于 OracleJDK1.8 和 Scala 2.11 通过源码进行编译打包的,所以所有组件的 scala 均为 2.11,此处和 Doris 社区略有不同。


三、部署

本文的 Flink 和 Doris 的部署不做描述,详情请查阅官网。

https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html#%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95

本文在 Dlink 部署成功的基础上进行,如需查看具体部署步骤,请阅读《flink sql 知其所以然(十六):flink sql 开发企业级利器之 Dlink》。

Dlink 的 plugins 下添加 doris-flink-1.0-SNAPSHOT.jar
flink-sql-connector-mysql-cdc-2.1.0.jar
。重启 Dlink。

plugins/ -- Flink 相关扩展
|- doris-flink-1.0-SNAPSHOT.jar
|- flink-csv-1.13.3.jar
|- flink-dist_2.11-1.13.3.jar
|- flink-format-changelog-json-2.1.0.jar
|- flink-json-1.13.3.jar
|- flink-shaded-zookeeper-3.4.14.jar
|- flink-sql-connector-mysql-cdc-2.1.0.jar
|- flink-table_2.11-1.13.3.jar
|- flink-table-blink_2.11-1.13.3.jar

当然,如果您想直接使用 FLINK_HOME 的话,可以在 auto.sh
文件中 SETTING
变量添加$FLINK_HOME/lib


四、数据表

学生表 (student)

-- Mysql
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student`  (
 `sid` int(11) NOT NULL,
 `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
 PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `student` VALUES (1, '小红');
INSERT INTO `student` VALUES (2, '小黑');
INSERT INTO `student` VALUES (3, '小黄');

成绩表(score)

-- Mysql
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score`  (
 `cid` int(11) NOT NULL,
 `sid` int(11) NULL DEFAULT NULL,
 `cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
 `score` int(11) NULL DEFAULT NULL,
 PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 95);
INSERT INTO `score` VALUES (3, 1, 'english', 93);
INSERT INTO `score` VALUES (4, 2, 'chinese', 92);
INSERT INTO `score` VALUES (5, 2, 'math', 75);
INSERT INTO `score` VALUES (6, 2, 'english', 80);
INSERT INTO `score` VALUES (7, 3, 'chinese', 100);
INSERT INTO `score` VALUES (8, 3, 'math', 60);

学生成绩宽表(scoreinfo)

-- Doris
CREATE TABLE scoreinfo
(
  cid INT,
  sid INT,
  name VARCHAR(32),
  cls VARCHAR(32),
  score INT
)
UNIQUE KEY(cid)
DISTRIBUTED BY HASH(cid) BUCKETS 10
PROPERTIES("replication_num" = "1");


五、FlinkSQL

CREATE TABLE student (
  sid INT,
  name STRING,
   PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student');
CREATE TABLE score (
  cid INT,
  sid INT,
  cls STRING,
  score INT,
   PRIMARY KEY (cid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'score');
CREATE TABLE scoreinfo (
  cid INT,
  sid INT,
  name STRING,
  cls STRING,
  score INT,
   PRIMARY KEY (cid) NOT ENFORCED
) WITH (      
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030' ,
'table.identifier' = 'test.scoreinfo',
'username' = 'root',
'password'=''
);
insert into scoreinfo
select
a.cid,a.sid,b.name,a.cls,a.score
from score a
left join student b on a.sid = b.sid


六、调试

在 Dlink 中提交

本示例采用了 yarn-session 的方式进行提交。

FlinkWebUI

上图可见,流任务已经成功被 Dlink 提交的远程集群。

Doris 查询

上图可见,Doris 已经被写入了历史全量数据。

增量测试

在 Mysql 中执行新增语句:

INSERT INTO `score` VALUES (9, 3, 'english', 100);

Doris 成功被追加:

变动测试

在 Mysql 中执行新增语句:

update score set score = 100 where cid = 1

Doris 成功被修改:



七、总结

       Apache Flink 强大的流计算能力与 Apache Doris 的查询能力为企业内实时数仓或实时指标实现提供了另一种高效的选择方式,与此同时 Flink CDC 也更进一步降低整个实时链路的成本。


八、未来

本文只是讲述 Dlink 集成 CDC 与 Doris 的步骤和示例,后续将分享 Dlink 在数据中台中如何发挥效能来建设批流一体的数据仓库的实践与优化,敬请关注。

Dlink 将紧跟 Flink 官方社区发展,为推广及发展 Flink 的应用而奋斗,打造 FlinkSQL 的最佳搭档的形象。


九、交流

欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

QQ社区群:543709668,申请备注 “ Dlink ”,不写不批

微信社区群(推荐,大佬云集):申请添加 wenmo_ai,邀请您进群,申请备注 “Dlink”

       公众号:DataLink数据中台



扫描二维码获取

更多精彩

DataLink

数据中台




文章转载自DataLink数据中台,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论