暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

857Hub 2022-09-30
3236

1、业务需求及其痛点

公司诸多业务需求求其最新状态,例如车最新状态,桩最新状态,报告最新状态,检定任务最新状态,业务信息所有的明细数据保存至doris中,但是无法得知其最新状态集;

阶段1:根据GB4403、GB27930等协议,数据允许迟到7天,也就是说,通过sql进行计算的时候,必须取最近7天的数据,平均每天数据1000w条,就是单次计算大概在7000w条左右,通过创建最新状态表,然后通过sql取出结果集至状态表当中,通过调度框架dolphinscheduler对其进行调度;由于是最新状态其实时性比较高,往常是设定了1分钟的调度时间

痛点:

①:实时性根据调度时间确定,不管时间设定多短,都不够实时

②:频繁重复计算浪费大量计算资源

insert into the_monitor_latest_status
select vin, daq_time, province, city, district, odo, cha_state, op_mode, op_state, soc, curr, volt, lat, lng
from
    (select vin, daq_time, province, city, district, odo, cha_state, op_mode, op_state, soc, curr, volt, lat, lng,row_number() over (partition by vin order by daq_time desc)ro
     from ods_monitordata
     where daq_time >= date_format(data_sub(current_date(),interval 7 day),'%Y-%m-%d 00:00:00'and odo != 0 and province != 'unknown')t1
where ro = 1;

阶段2:

痛点:

①:开发成本高,每张表都需要写一段程序

Mysql外表需求和痛点:

业务系统很多表结构一直存储在mysql当中,其中的大表(数据量大)都会同步至doris中,数据量较小的维表没必要同步至doris当中,可以通过外表的方式挂载到doris中,但是创建外表的步骤较为繁琐,只能一张张手动创建,另外mysql中表结构更改后,外表就需要重建

痛点:

①:外部表手动创建繁琐,如100张表全部手动创建

②:mysql表结构更改就需要重新创建外表

2、mysql_to_doris结构图

工具实现上述优化,优点如下:

  • shell编写极其轻量,开源即用
  • 纯sql语法开发成本0特别适用于当前业务场景
  • 简单配置实现全程自动化处理

架构图

mysql_to_doris/
├── bin
│   ├── auto.sh  --Flink_job启动脚本
│   ├── create_doris.sh  --生成doris映射flink的建表语句
│   ├── create_mysql.sh  --生成mysql映射flink的建表语句
│   ├── e_auto.sh  --外部表执行脚本
│   ├── e_mysql_to_doris.sh  --外部表建表语句生成脚本
│   ├── flinksql.sh  --flink_job语句生成脚本
│   └── insert_into.sh  --insert into 语句生成脚本
├── conf
│   ├── doris
│   │   ├── doris.conf  --doris连接配置信息
│   │   ├── flink.conf  --flink特殊配置项
│   │   └── tables  --sink端的库名.表名
│   ├── e_mysql
│   │   ├── doris.conf  --外部表连接信息
│   │   ├── doris_tables  --外部表库名.表名(自定义)
│   │   ├── mysql.conf  --外部表连接信息
│   │   └── mysql_tables  --源表库名.表名
│   ├── flink
│   │   ├── flink_conf  --flink配置信息
│   └── mysql
│       ├── flink.conf  --flink特殊配置项
│       ├── mysql.conf  --mysql连接配置信息
│       └── tables  --source端的库名.表名
└── lib
    ├── doris_to_flink.sh  --doris映射flink表结构转换
    ├── mysql_to_doris.sh  --mysql映射doris外表结构转换
    └── mysql_to_flink.sh  --mysql映射flink外表结构转换

代码流程:

1、获取建表语句

for table in $(cat ../conf/e_mysql/mysql_tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        echo "show create table ${table};" |mysql -h$mysql_host -uroot -p$mysql_password  >> $path
done

2、调整格式

awk -F '\t' '{print $2}' $path |awk '!(NR%2)' |awk '{print $0 ";"}' > ../result/tmp111.sql
sed -i 's/\\n/\n/g' ../result/tmp111.sql
sed -n '/CREATE TABLE/,/ENGINE\=/p' ../result/tmp111.sql > ../result/tmp222.sql
##delete tables special struct
sed -i '/^  CON/d' ../result/tmp222.sql
sed -i '/^  KEY/d' ../result/tmp222.sql

3、拼接doris信息

sed -i '/ENGINE=/a) ENGINE=ODBC\n COMMENT "ODBC"\nPROPERTIES (\n"host" = "ApacheDorisHostIp",\n"port" = "3306",\n"user" = "root",\n"password" = "ApacheDorisHostPassword",\n"database" = "ApacheDorisDataBases",\n"table" = "ApacheDorisTables",\n"driver" = "MySQL",\n"odbc_type" = "mysql");' $path

3、涉及组件介绍:

  • FlinkCDC版本2.2.1
  • Doris Flink Connector版本:1.14_2.12-1.0.0
  • FLink版本:1.14.5
  • Hadoop版本:3.1.3
  • doris版本:1.1.1
  • mysql odbc版本:5.3.13
链接:https://pan.baidu.com/s/1eMML1Km-VYa01SRQaGuwBQ 
提取码:yyds

什么是 CDC

CDC 是 Change Data Capture 变更数据获取的简称。

核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入 INSERT、更新 UPDATE、删除 DELETE 等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 技术应用场景也非常广泛,包括:

  • 数据分发:将一个数据源分发给多个下游,常用于业务解耦、微服务。
  • 数据集成:将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
  • 数据迁移:常用于数据库备份、容灾等。

什么是 Apache Doris

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。可以使数据分析工作更加简单高效!

什么是 Doris Flink Connector

Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展。实现了通过flink实时写入数据进入到doris的可能,Flink Doris Connector之前,针对业务不规则数据,经常需要针对消息做规范处理,空值过滤等写入新的topic,然后再启动Routine load写入Doris。Flink Doris Connector之后,flink可以直接读取kafka,直接写入doris。

什么是Doris On ODBC

ODBC External Table Of Doris 提供了Doris通过数据库访问的标准接口(ODBC)来访问外部表,外部表省去了繁琐的数据导入工作,让Doris可以具有了访问各式数据库的能力,并借助Doris本身的OLAP的能力来解决外部表的数据分析问题:

  1. 支持各种数据源接入Doris
  2. 支持Doris与各种数据源中的表联合查询,进行更加复杂的分析操作
  3. 通过insert into将Doris执行的查询结果写入外部的数据源

4、准备工作:

安装部署

Mysql:

①:wget http://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm

②:yum -y localinstall mysql57-community-release-el7-8.noarch.rpm

③:yum -y install mysql-community-server --nogpgcheck

Doris:

安装部署doris;

①:上传tar包

②:修改fe.conf的ip和端口

③:修改be.conf的ip和端口

④:启动fe,添加be节点

前置要求

Flink on yarn

①:安装部署Flink

②:配置环境变量HADOOP_CLASSPATH, 在/etc/profile.d/my.sh中配置

③:配置FLINK_HOME到/etc/profile


  export HADOOP_CLASSPATH=`hadoop classpath

Flink doris connector

上传Flink doris connector到flink的lib目录下

Flink CDC

上传FlinkCDC到Flink的lib目录下

Mysql ODBC

①:解压tar包

②:复制文件到linux的/use/lib64目录下

 cp lib/* usr/lib64

③:运行bin目录下的文件

./myodbc-installer -d -a -n "MySQL ODBC 5.3 Driver" -t "DRIVER=/usr/lib64/libmyodbc5w.so;SETUP=/usr/lib64/libmyodbc5w.so"

④:修改be.odbcinst.ini配置文件并且分发所有be节点

5、功能演示

mysql外表同步:

  • mysql中创建表:

    mysql -uroot -proot;

    create database t_demo;
    use t_demo;
    CREATE TABLE `t_cickp_charge_connector` (
      `ID` varchar(32NOT NULL COMMENT '主键',
      `E_ID` varchar(32NOT NULL COMMENT '关联充电设备表ID',
      `CONNECTOR_ID` varchar(26NOT NULL,
      `CONNECTOR_NAME` varchar(30DEFAULT NULL,
      `CONNECTOR_TYPE` smallint DEFAULT NULL,
      `VOLTAGE_UPPER_LIMIT` int DEFAULT NULL,
      `VOLTAGE_LOWER_LIMIT` int DEFAULT NULL,
      `CONNECTOR_CURRENT` int DEFAULT NULL,
      `CONNECTOR_POWER` decimal(19,10DEFAULT NULL,
      `PARK_NO` varchar(10DEFAULT NULL,
      `VOLTAGE` int DEFAULT NULL,
      `BMS_POWER_TYPE` smallint DEFAULT NULL,
      `CREATE_TIME` datetime DEFAULT NULL,
      `UPDATE_TIME` datetime DEFAULT NULL,
      PRIMARY KEY (`ID`),
      KEY `idx_equipment_id` (`E_ID`)
    ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='充电枪表';

    insert into t_demo.t_cickp_charge_connector (ID, E_ID, CONNECTOR_ID, CONNECTOR_NAME, CONNECTOR_TYPE, VOLTAGE_UPPER_LIMIT, VOLTAGE_LOWER_LIMIT, CONNECTOR_CURRENT, CONNECTOR_POWER, PARK_NO, VOLTAGE, BMS_POWER_TYPE, CREATE_TIME, UPDATE_TIME)
    values  ('000256eb359d470082b20ad4a4edf88e''207f80bec5e14c27b38042d46930282d''000000001001132__0''充电枪01'375022011284.0000000000''03'2022-08-22 10:50:12'null),
            ('00081df6bbeb4463bc945bde3076b1b0''d619e677456f4bf8998e241dae28692b''TS1704140074001''TS1704140074'3220220327.0000000000null220255'2022-07-23 12:30:04'null),
            ('0010e36ba5b6432b812901512d2767b2''d7cd94fa66944c80951997ac81b9f53f''HE121118010071001''HE121118010071'3220220327.0000000000null220255'2022-07-23 15:40:43'null),
            ('001a672ca25f45deab52deae778a98e1''3bc1663886a942febf04cb39ea8ea803''500085001''500085A'400200120.0000000000''7501'2021-10-12 16:05:53'null),
            ('001cca79ea2f4ac7a73e830165ceb0cd''02823a48b93047dd955839f34948084b''HE121118062736_1001''HE121118062736_1'3220220327.0000000000null220255'2022-07-23 12:29:56'null),
            ('002f30773c674d32960ae5ec1230ee16''bc296f5e53c24594b839ebea34ff565f''HE121118040817_1001''HE121118040817_1'3220220327.0000000000null220255'2022-07-23 12:29:52'null),
            ('003b630113fb45f6ad52337ad4e21619''0313985f1b274a50a861a29287776551''HE121122010801001''HE121122010801'3220220327.0000000000null220255'2022-07-23 12:30:19'null),
            ('003dc6e54c60427cb5a1003de4504877''178de38f88454d85bc3fd36fb2c63c28''GP1804280341001''GP1804280341'3220220327.0000000000null220255'2022-07-23 12:30:25'null),
            ('003e884f521442b4ab9bf050395f0b7f''fc9b29532bfe4cc9a10647f4ed2ea7e2''HE121118063280_1001''HE121118063280_1'3220220327.0000000000null220255'2022-07-23 15:40:44'null),
            ('004e481d713242c1b49026b45042809a''3949d30b22e7414b953bca61ee973476''HE121121050028001''HE121121050028'3220220327.0000000000null220255'2022-07-23 15:40:44'null);


    CREATE TABLE `t_cickp_charge_equipment` (
      `ID` varchar(32NOT NULL COMMENT '主键',
      `S_ID` varchar(32NOT NULL COMMENT '关联充电站表ID',
      `EQUIPMENT_ID` varchar(24NOT NULL,
      `MANUFACTURER_ID` varchar(10DEFAULT NULL,
      `EQUIPMENT_MODEL` varchar(20DEFAULT NULL,
      `PRODUCTION_DATE` varchar(10DEFAULT NULL,
      `EQUIPMENT_TYPE` smallint DEFAULT NULL,
      `EQUIPMENT_LNG` decimal(19,6DEFAULT NULL,
      `EQUIPMENT_LAT` decimal(19,6DEFAULT NULL,
      `EQUIPMENT_NAME` varchar(30DEFAULT NULL,
      `EQUIPMENT_TOTAL_POWER` decimal(19,1DEFAULT NULL,
      `MANUFACTURER_NAME` varchar(30DEFAULT NULL,
      `EQUIPMENT_ORDER` varchar(255DEFAULT NULL,
      `EQUIPMENT_STATUS` smallint DEFAULT NULL,
      `EQUIPMENT_POWER` decimal(19,1DEFAULT NULL,
      `NEW_NATIONAL_STANDARD` smallint DEFAULT NULL,
      `CREATE_TIME` datetime DEFAULT NULL,
      `UPDATE_TIME` datetime DEFAULT NULL,
      `ACCURACY_LEVEL` smallint DEFAULT NULL,
      `CHECK_TIME_LAST` varchar(30DEFAULT NULL,
      `CHECK_TIME_NEXT` varchar(30DEFAULT NULL,
      `CERTIFICATE` varchar(255DEFAULT NULL,
      `start_using_date` varchar(10CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '投入使用时间',
      `put_into_use` datetime(3DEFAULT NULL COMMENT '投入使用时间',
      PRIMARY KEY (`ID`),
      KEY `idx_station_id` (`S_ID`)
    ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='充电桩表';

    insert into t_demo.t_cickp_charge_equipment (ID, S_ID, EQUIPMENT_ID, MANUFACTURER_ID, EQUIPMENT_MODEL, PRODUCTION_DATE, EQUIPMENT_TYPE, EQUIPMENT_LNG, EQUIPMENT_LAT, EQUIPMENT_NAME, EQUIPMENT_TOTAL_POWER, MANUFACTURER_NAME, EQUIPMENT_ORDER, EQUIPMENT_STATUS, EQUIPMENT_POWER, NEW_NATIONAL_STANDARD, CREATE_TIME, UPDATE_TIME, ACCURACY_LEVEL, CHECK_TIME_LAST, CHECK_TIME_NEXT, CERTIFICATE, start_using_date, put_into_use)
    values  ('0014d314ec694faead804302efabfeee''4392ed2f04154473b9f8b91c2a938741''HE121121050250''360051856''ZL30-A6''2021-10-08'2113.90545722.771673'HE121121050250'7.0nullnull507.02'2022-07-23 12:30:24'nullnullnullnullnullnullnull),
            ('001e0b7c5dc7436fb5479179080983f5''bf036a0583334522acab7b2f33444608''HE121119050138''360051856''ZL30-A6''2020-05-18'2114.32287622.693771'HE121119050138'7.0nullnull507.02'2022-07-23 12:30:29'nullnullnullnullnullnullnull),
            ('002c8597b9b241978b0a2d700e32cfae''3465e6c1098a41d9893aa8b52727798d''500020''MA5DRRDX1''ZDDC120BG'null10.0000000.000000'17'120.0'深圳智电新能源科技有限公司'null50120.02'2021-10-12 16:01:34''2022-06-20 12:12:43'nullnullnullnullnullnull),
            ('008b368751c643219da292fd76d569f1''65ce25d50c1642799bd97759da568373''000000001046001''MA5DA0053''CL5823''2017-06-02'1114.35246422.711021'1号桩'120.0'深圳车电网'null50120.02'2022-08-22 10:50:06'nullnullnullnullnullnullnull),
            ('009cb3f213384199b9d1816fb6900dcc''1713051024a74b15874ffabd0411890c''000000001063015''MA5DA0053''CL5899''2021-11-27'2113.92022822.535621'交流充电桩'7.0'深圳市车电网络有限公司'null507.02'2022-05-30 11:19:17''2022-06-14 16:32:47'nullnullnullnullnullnull),
            ('00ba38325d0e4804960bbb591f0f69e1''7a69943d322a411e92132ca6989c55f7''HE121121052645''360051856''ZL30-A6''2022-01-10'2113.90919522.601933'HE121121052645'7.0nullnull507.02'2022-07-23 12:30:04'nullnullnullnullnullnullnull),
            ('00c25458804446d8b2fad1d007c1c812''cd14b93718ee468cbeccf1e84a2583cb''HE121119052764''360051856''ZL30-A6''2019-07-26'2114.04923222.701670'HE121119052764'7.0nullnull507.02'2022-07-23 12:29:55'nullnullnullnullnullnullnull),
            ('00d113034d9b4067bd12afbea03a1b52''1c0a1b912d974a828ba82c620e0bba26''HE121120041074''360051856''ZL30-A6''2021-05-12'2113.94640422.498718'HE121120041074'7.0nullnull507.02'2022-07-23 12:30:15'nullnullnullnullnullnullnull),
            ('00d61d48e8004df095eb74985258d493''8b7214a83fd74cd1bce14ace5e129107''HE121119030713_1''360051856''ZL30-A6''2019-05-15'2113.88366722.788374'HE121119030713_1'7.0nullnull507.02'2022-07-23 12:29:57'nullnullnullnullnullnullnull),
            ('00d70d75e8ce44ba9eed370f67432bff''4f52fcd099f547db8b44421d22d53dec''TS1704250011''360051856''JL7-A2-TS''2017-07-17'2113.93076322.523027'TS1704250011'7.0nullnull507.02'2022-07-23 12:30:11'nullnullnullnullnullnullnull);
            
    create database p_demo;
    use p_demo;
    CREATE TABLE `p_inspection_task` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '检定任务id',
      `name` varchar(63DEFAULT NULL COMMENT '任务名称',
      `type` int DEFAULT NULL COMMENT '任务类型(0:指定检定,1:桩随机抽检,2:双随机抽检)',
      `task_issue_date` datetime DEFAULT NULL COMMENT '任务下达日期',
      `status` int DEFAULT '0' COMMENT '状态(0:新建,1:已下达,2:进行中,3:已完成)',
      `task_achieve_time` datetime DEFAULT NULL COMMENT '实际完成时间',
      `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      `deadline` datetime DEFAULT NULL COMMENT '计划完成时间',
      `charger_platform_task` bigint DEFAULT NULL COMMENT '充电桩平台下发的任务id',
      `creator` int DEFAULT NULL COMMENT '创建者',
      PRIMARY KEY (`id`)
    ENGINE=InnoDB AUTO_INCREMENT=178 DEFAULT CHARSET=utf8mb3 COMMENT='检定任务表';

    insert into p_demo.p_inspection_task (idnametype, task_issue_date, status, task_achieve_time, create_time, update_time, deadline, charger_platform_task, creator)
    values  (1'任务1'0'2022-06-22 10:44:14'0'2022-06-23 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (2'任务2'0'2022-06-22 11:07:24'0'2022-06-09 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (3'任务3'1'2022-06-22 11:19:23'0'2022-06-01 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (4'预警检定任务-1'0'2022-06-22 14:07:23'0'2022-06-30 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (5'预警检定-2'0'2022-06-22 14:09:47'0'2022-06-29 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (6'预警检定任务-1'0'2022-06-22 14:22:03'0'2022-06-10 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (7'预警检定任务-2'0'2022-06-22 14:22:28'2'2022-06-29 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (8'预警检定任务XX'0'2022-06-23 10:43:59'0'2022-06-30 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (9'A任务1'0'2022-06-23 10:56:23'0'2022-06-30 00:00:00''2022-07-13 16:34:32'nullnullnullnull),
            (10'A任务2'1'2022-06-23 11:06:10'0'2022-06-28 00:00:00''2022-07-13 16:34:32'nullnullnullnull);
            
            
    CREATE TABLE `p_inspection_result_record` (
      `id` bigint NOT NULL AUTO_INCREMENT,
      `inspect_task_detail_id` varchar(255NOT NULL COMMENT '检定任务详情id',
      `looks_check` int DEFAULT NULL COMMENT '外观检查(0:不合格,1:合格)',
      `insulation_resistance_check` int DEFAULT NULL COMMENT '绝缘电阻(0:不合格,1:合格)',
      `work_temp_amend_value` varchar(16DEFAULT NULL COMMENT '工作误差温度修正值',
      `accuracy_level` varchar(16DEFAULT NULL COMMENT '工作误差精确度等级',
      `value_temp_amend_value` varchar(16DEFAULT NULL COMMENT '示值误差温度修正值',
      `value_errors_check` int DEFAULT NULL COMMENT '示值误差检查(0:不合格,1:合格)',
      `pay_errors_check` int DEFAULT NULL COMMENT '付费金额误差检查(0:不合格,1:合格)',
      `clock_errors` int DEFAULT NULL COMMENT '时钟求值误差检查(0:不合格,1:合格)',
      `conclusion` int DEFAULT NULL COMMENT '结论(0:不合格,1:合格)',
      `check_time` datetime DEFAULT NULL COMMENT '检定时间',
      `report_state` int DEFAULT NULL COMMENT '报告审批状态(0审批驳回,1审批通过,2进行中)',
      `inspect_report` varchar(255DEFAULT NULL COMMENT '鉴定报告',
      PRIMARY KEY (`id`)
    ENGINE=InnoDB AUTO_INCREMENT=57 DEFAULT CHARSET=utf8mb3 COMMENT='检定结果记录';

    insert into p_demo.p_inspection_result_record (id, inspect_task_detail_id, looks_check, insulation_resistance_check, work_temp_amend_value, accuracy_level, value_temp_amend_value, value_errors_check, pay_errors_check, clock_errors, conclusion, check_time, report_state, inspect_report)
    values  (1'10000'11'0''1''0'011null'2022-07-16 14:20:59'2null),
            (2'44444'1nullnullnullnull111null'2022-09-17 15:37:24'nullnull),
            (3'3713841537fc462d8304e3ec60cf803a'nullnullnullnullnullnullnullnullnull'2022-08-02 08:00:00'nullnull),
            (4'537b3970e78441f097a8c6b42dbebdef'nullnullnullnullnullnullnullnullnull'2022-08-03 08:00:00'nullnull),
            (5'319d093d14c44c90810ed90e179d8ec6'01'121''1''133'101null'2022-11-09 08:00:00'nullnull),
            (6'7271e97cd6cd4472b30f881170ae0d4d'nullnullnullnullnullnullnullnullnull'2022-08-09 08:00:00'nullnull),
            (7'f2522ac4834546e7aaf31630631ab53b'nullnullnullnullnullnullnullnullnull'2022-08-09 08:00:00'nullnull),
            (8'fc52a61adc174d7b95de600f7033a336'nullnullnullnullnullnullnullnullnull'2022-08-04 08:00:00'nullnull),
            (9'4a2d79a855374c098a7a4704077e7b73'nullnullnullnullnullnullnullnullnull'2022-08-09 08:00:00'nullnull),
            (10'e95419eb804f438aa43ef31138cc282e'nullnullnullnullnullnullnullnullnull'2022-08-02 08:00:00'nullnull);

  • 配置文件

    doris.conf
    doris_tables
    mysql.conf
    mysql_tables

    mysql_tables

t_demo.t_cickp_charge_connector
t_demo.t_cickp_charge_equipment
p_demo.p_inspection_task
p_demo.p_inspection_result_record

doris_tables

demo.demo1
demo.demo2
demo.demo3
demo2.demo

从idea演示多库多表的情况;

删除字段,修改数据类型;

删除字段:
alter table t_demo.t_cickp_charge_connector drop column CONNECTOR_NAME;
修改数据类型:
alter table p_inspection_result_record modify column looks_check decimal(4,2);

演示监控元数据监控及其原理;

#!/bin/bash
source ../conf/e_mysql/doris.conf
echo "source ../result/e_mysql_to_doris.sql;" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
while (( 1 == 1 ))
do
sleep 60
sh ./e_mysql_to_doris.sh ../result/new_e_mysql_to_doris.sql
old=`md5sum ../result/e_mysql_to_doris.sql |awk -F ' ' '{print $1}'`
new=`md5sum ../result/new_e_mysql_to_doris.sql |awk -F ' ' '{print $1}'`
        if [[ $old != $new ]];then
        x=0
                for table in $(cat ../conf/e_mysql/doris_tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                       echo "drop table if exists ${table};" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
                done
                echo "source ../result/new_e_mysql_to_doris.sql;" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
                rm -rf ../result/e_mysql_to_doris.sql
                mv ../result/new_e_mysql_to_doris.sql ../result/e_mysql_to_doris.sql
                fi
        rm -f ../result/new_e_mysql_to_doris.sql
done


FlinkCDC实时同步:

mysql中建表:

CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26'),
(10021,'1960-02-20','Ramzi','Erde','M','1988-02-10'),
(10022,'1952-07-08','Shahaf','Famili','M','1995-08-22'),
(10023,'1953-09-29','Bojan','Montemayor','F','1989-12-17'),
(10024,'1958-09-05','Suzette','Pettey','F','1997-05-19'),
(10025,'1958-10-31','Prasadram','Heyers','M','1987-08-17'),
(10026,'1953-04-03','Yongqiao','Berztiss','M','1995-03-20'),
(10027,'1962-07-10','Divier','Reistad','F','1989-07-07'),
(10028,'1963-11-26','Domenick','Tempesti','M','1991-10-22'),
(10029,'1956-12-13','Otmar','Herbst','M','1985-11-20'),
(10030,'1958-07-14','Elvis','Demeyer','M','1994-02-17'),
(10031,'1959-01-27','Karsten','Joslin','M','1991-09-01'),
(10032,'1960-08-09','Jeong','Reistad','F','1990-06-20'),
(10033,'1956-11-14','Arif','Merlo','M','1987-03-18'),
(10034,'1962-12-29','Bader','Swan','M','1988-09-21'),
(10035,'1953-02-08','Alain','Chappelet','M','1988-09-05'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');

CREATE TABLE employees_2 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20'),
(10047,'1952-06-29','Zvonko','Nyanchama','M','1989-03-31'),
(10048,'1963-07-11','Florian','Syrotiuk','M','1985-02-24'),
(10049,'1961-04-24','Basil','Tramer','F','1992-05-04'),
(10050,'1958-05-21','Yinghua','Dredge','M','1990-12-25'),
(10051,'1953-07-28','Hidefumi','Caine','M','1992-10-15'),
(10052,'1961-02-26','Heping','Nitsch','M','1988-05-21'),
(10053,'1954-09-13','Sanjiv','Zschoche','F','1986-02-04'),
(10054,'1957-04-04','Mayumi','Schueller','M','1995-03-13');


CREATE DATABASE emp_2;

USE emp_2;

CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);


INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');


CREATE TABLE employees_2(
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_2` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18'),
(10095,'1965-01-03','Hilari','Morton','M','1986-07-15'),
(10096,'1954-09-16','Jayson','Mandell','M','1990-01-14'),
(10097,'1952-02-27','Remzi','Waschkowski','M','1990-09-15'),
(10098,'1961-09-23','Sreekrishna','Servieres','F','1985-05-13'),
(10099,'1956-05-25','Valter','Sullins','F','1988-10-18'),
(10100,'1953-04-21','Hironobu','Haraldson','F','1987-09-21'),
(10101,'1952-04-15','Perla','Heyers','F','1992-12-28'),
(10102,'1959-11-04','Paraskevi','Luby','F','1994-01-26'),
(10103,'1953-11-26','Akemi','Birch','M','1986-12-02'),
(10104,'1961-11-19','Xinyu','Warwick','M','1987-04-16'),
(10105,'1962-02-05','Hironoby','Piveteau','M','1999-03-23'),
(10106,'1952-08-29','Eben','Aingworth','M','1990-12-19'),
(10107,'1956-06-13','Dung','Baca','F','1994-03-22'),
(10108,'1952-04-07','Lunjin','Giveon','M','1986-10-02'),
(10109,'1958-11-25','Mariusz','Prampolini','F','1993-06-16'),
(10110,'1957-03-07','Xuejia','Ullian','F','1986-08-22'),
(10111,'1963-08-29','Hugo','Rosis','F','1988-06-19'),
(10112,'1963-08-13','Yuichiro','Swick','F','1985-10-08'),
(10113,'1963-11-13','Jaewon','Syrzycki','M','1989-12-24'),
(10114,'1957-02-16','Munir','Demeyer','F','1992-07-17'),
(10115,'1964-12-25','Chikara','Rissland','M','1986-01-23'),
(10116,'1955-08-26','Dayanand','Czap','F','1985-05-28');

doris中建表:

create database demo;
use demo;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no``birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

配置文件

doris
├── doris.conf
├── flink.conf
└── tables

mysql
├── flink.conf
├── mysql.conf
└── tables

flink
├── flink_conf

注意mysql的tables和doris的tables是一一对应的关系


文件详解

```mysql中flink.conf
-- 指定binlog消费方式
'scan.startup.mode' = 'initial'

```flink.conf
-- 运行的yarn模式,需要用户配置flink on yarn
set 'execution.target' = 'yarn-per-job';
-- 运行的yarn任务的名称
set 'yarn.application.name' = 'flinkjob_database';
-- checkpoint配置
set 'state.backend' = 'filesystem';
set 'state.checkpoints.dir' = 'hdfs:///ck';
set 'execution.checkpointing.interval' = '6000';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '1';
set 'execution.checkpointing.timeout' ='600000';
set 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
set 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 资源配置
set 'jobmanager.memory.process.size' = '1600m';
set 'taskmanager.memory.process.size' = '1780m';
set 'taskmanager.memory.managed.size' = '100m';
set 'taskmanager.numberoftaskslots' = '1';

```doris中flink.conf
-- 两阶段提交
'sink.properties.two_phase_commit' = 'true'



开始执行,去bin目录下执行脚本生成文件

文件生成格式:

每个Mysql库等于一个Flink任务等于一个Flink脚本文件

每个Flink脚本文件由一个Mysql脚本语句和一个Doris映射语句和一个insert同步语句构成

所以生成格式为

mysql_db.sql
doris_mysqldb.sql
insert_mysqldb.sql
flink_mysqldb.sql

原因如下:

首先为了统一命名格式便于理解

其次如果以doris库命名很有可能mysql10个库同步至doris一个库当中,就导致只会生成一个flinkjob

测试先:准备好insert_into和delete语句

INSERT INTO emp_2.employees_2 VALUES (11,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(22,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(33,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(44,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02')

delete from emp_2.employees_2 where emp_no = 11;
delete from emp_2.employees_2 where emp_no = 22;
delete from emp_2.employees_2 where emp_no = 33;
delete from emp_2.employees_2 where emp_no = 44;

update emp_2.employees_2 set first_name = 'toms' where emp_no = 10091 ;
update emp_2.employees_2 set first_name = 'toms' where emp_no= 10092;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10093;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10094;

统计语句
use emp_1;
select count(1from employees_1;
select count(1from employees_2;
use emp_2;
select count(1from employees_1;
select count(1from employees_2;

insert端的实现:

for m_table in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}')
        do
        let a++
        m_d=`cat ../conf/mysql/tables |grep -v '#' | awk "NR==$a{print}" |awk -F '.' '{print $1}'`
        d_table=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$a{print}"`
        sed "/$d_table$sink/,/KEY(/{//d;s/ *//;p};d" ../result/doris_$m_d.sql |awk '!x[$0]++' |awk '{print $0}'| awk -F '`' '{print $2}'|awk -F '\n' '{print $1","}' |sed '$s/.$//' > a.c
        ac=`cat a.c`
        m_d=`echo $m_table | awk -F '.' '{print $1}'`
        echo -e "insert into \`$d_table$sink\`\nselect\n${ac}\nfrom\n\`$m_table$src\`\nwhere 1=1;\n\n" >> ../result/insert_$m_d.sql
        rm -rf a.c
done

CREATE TABLE `demo.all_employees_info_sink1` (
  `emp_no` int NOT NULL COMMENT '',
  `birth_date` date NULL COMMENT '',
  `first_name` varchar(20NULL COMMENT '',
  `last_name` varchar(20NULL COMMENT '',
  `gender` string NULL COMMENT '',
  `hire_date` date NULL COMMENT '',
  `database_name` varchar(50NULL COMMENT '',
  `table_name` varchar(200NULL COMMENT '',
PRIMARY KEY(`emp_no``birth_date`)
NOT ENFORCED
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

source端的实现:

label=0
for t_name in $(cat ../conf/doris/tables | awk -F '\n' '{print $1}' | awk -F '.' '{print $2}')
        do
        let label++
        d_label=`cat ../conf/mysql/tables | awk "NR == $label"{print} |sed 's/\./\_/g'`
        m_d=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        d_d=`cat ../conf/doris/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'`
        m_t=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $2}'`
        sed -i "0,/doris_username/s/doris_username/${fe_master_username}/" ../result/$t.sql
        sed -i "0,/doris_password/s/doris_password/${fe_master_password}/" ../result/$t.sql
        sed -i "0,/doris_table/s/doris_table/${d_d}.${t_name}/" ../result/$t.sql
        sed -i "0,/doris_connector/s/doris_connector/doris/" ../result/$t.sql
        sed -i "0,/doris_fenodes/s/doris_fenodes/${fe_master_host}:${fe_load_url_port}/" ../result/$t.sql
        sed -i "0,/doris_label\-prefix/s/doris_label\-prefix/${m_d}_${m_t}\_`date "+%y%m%d_%H%M%S"_$label`/" ../result/$t.sql

done

create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20null comment '',
  `last_name` varchar(20null comment '',
  `gender` string null comment '',
  `hire_date` date null comment '',
  `database_name` varchar(50null comment '',
  `table_name` varchar(200null comment '',
primary key(`emp_no``birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

sink端的实现:

for i in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}'|awk -F '.' '{print $1}' |sort -u)
        do
        sed -i '1iBEGIN STATEMENT SET;' ../result/insert_$i.sql
        sed -i '$aEND;' ../result/insert_$i.sql


        b=0
        for table in $(cat ../conf/doris/tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                let b++
                d_doris=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$b"`
                sed -i "0,/into \`${d_doris}_sink\`/s/into \`${d_doris}_sink\`/into \`${d_doris}_sink${b}\`/" ../result/insert_$i.sql

        done


done

create table if not exists `demo.all_employees_info_sink1` (
  `emp_no` int not null comment '',
  `birth_date` date null comment '',
  `first_name` varchar(20null comment '',
  `last_name` varchar(20null comment '',
  `gender` string null comment '',
  `hire_date` date null comment ''
primary key(`emp_no``birth_date`)
not enforced
 ) with (
'sink.properties.two_phase_commit' = 'true',
'fenodes' = '192.168.213.162:8031',
'username' = 'root',
'password' = 'zykj2021',
'table.identifier' = 'demo.all_employees_info',
'connector' = 'doris',
'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');

实现从checkpoint恢复任务:

1、小的改动:

  • 获取从job的web获取到checkpoint的url(注意每30s都会更改一次)

  • 修改文件内容

  • 修改label

  • 修改最终的checkpointurl


  • 保存文件

碰到的问题:调整sql,相当变更算子,这时执行重新checkpoit会报错,需忽略这些新算子

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://127.0.0.1/fck/78f7cb6b577fe6db19648ca63607e640/chk-66. Cannot map checkpoint/savepoint state for operator e75d4004e6c5f0908bd4077fcf200fcd to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

2、大的改动

  • cp出一份mysql_to_doris
  • 重新配置表结构,执行sh flinkjob.sh
  • 复制文件
  • 整体粘贴
  • 配置最终checkpoint的url

6、展望:

  • 代码优化,将变量重新命名,调整格式以及通过变量简化代码量
  • 数据类型转换规范化,在此基础上继续填充
  • 优化checkpoint点,达到只需修改配置项即可同步的效果




文章转载自857Hub,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论