暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dinky 扩展 ClickHouse 的实践分享

Dinky开源 2022-08-06
1101
摘要:本文由韩非老师介绍了 Dinky 实时计算平台基于 Flink SQL Connector 并结合 Catalog 来扩展 ClickHouse 的实践分享。内容包括:
  1. 前言
  2. 环境要求
  3. Flink ClickHouse 连接器编译
  4. 所需依赖
  5. 脚本准备
  6. Hive Catalog 作业
  7. Dinky MySQL Catalog 作业
  8. 总结


Tips:历史传送门
Dinky 实践系列之 Flink Catalog 元数据管理
Dinky实践系列之FlinkCDC整库实时入仓入湖
Dinky FlinkCDC 整库入仓 StarRocks
打造 Flink + StarRocks+ Dinky 的极速统一分析平台
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~


一、前言

    当前在大数据领域比较火热的 OLAP 引擎有 Doris 和 ClickHouse。基于 Apache Flink 的实时计算平台 Dinky 目前已经支持 Doris 和 ClickHouse 。在本次集成实践中,将以 Hive Catalog 和 Dinky 中的 MySQL Catalog 为元数据管理,将MySQL 数据写入 ClickHouse。



二、环境要求

软件
版本
CDH6.2.0
Hadoop3.0.0-cdh6.2.0
Hive2.1.1-cdh6.2.0
Flink1.13.6
Flink CDC2.2.1
Dinky0.6.6
MySQL5.7
ClickHouse22.2.2.1(单机版)




三、Flink ClickHouse 连接器编译

    此连接器包含 Flink1.12、Flink1.13和Flink1.14。下面以 Flink1.13 为例编译connector。

下载 Flink ClickHouse

    git clone https://github.com/itinycheng/flink-connector-clickhouse.git
    cd flink-connector-clickhouse/
    git checkout -b release-1.13 origin/release-1.13

    IDEA 编译 Flink ClickHouse

        修改pom。

      <version>1.13.2-SNAPSHOT</version>
      修改为
      <version>1.13.6</version>


      <flink.version>1.13.2</flink.version>
      修改为
      <flink.version>1.13.6</flink.version>


      #scala版本根据自身情况修改
      <scala.binary.version>2.11</scala.binary.version>
      修改为
      <scala.binary.version>2.12</scala.binary.version>

          修改完成后,即可进行编译。

          编译完成后,jar包如下:



      四、所需依赖

        #hive依赖包
        antlr-runtime-3.5.2.jar
        hive-exec-2.1.1-cdh6.2.0.jar
        libfb303-0.9.3.jar
        flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar
        hive-site.xml
        # hadoop依赖
        flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
        # Dinky mysql catalog依赖
        dlink-catalog-mysql-1.13-0.6.6-SNAPSHOT.jar
        # Dinky hadoop依赖
        flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
        # mysql 驱动依赖
        mysql-connector-java-8.0.21.jar
        # clickhouse 依赖
        clickhouse-jdbc-0.2.6.jar
        flink-connector-clickhouse-1.13.6.jar
        # flink cdc依赖包
        flink-sql-connector-mysql-cdc-2.2.1.jar

        说明

        1.hive 依赖包放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下

        2.hadoop 依赖包放置 $FLINK_HOME/lib下

        3.mysql 驱动依赖放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下

        4.clickhouse 依赖放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下

        5.Dinky mysql catalog依赖放置 $FLINK_HOME/lib下

        6.flink cdc依赖包放置 $FLINK_HOME/lib和$DINKY_HOME/plugins下

        7.Dinky hadoop依赖包放置$DINKY_HOME/plugins下(网盘或者群公告下载)

        以上依赖放入后,重启 Flink 集群和 Dinky。




        五、脚本准备


        MySQL 建表语句

          # mysql建表语句
          CREATE TABLE bigdata.products (
          id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
          name VARCHAR(255) NOT NULL,
          description VARCHAR(512)
          );


          ALTER TABLE bigdata.products AUTO_INCREMENT = 101;


          INSERT INTO bigdata.products
          VALUES (default,"scooter","Small 2-wheel scooter"),
          (default,"car battery","12V car battery"),
          (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
          (default,"hammer","12oz carpenter's hammer"),
          (default,"hammer","14oz carpenter's hammer"),
          (default,"hammer","16oz carpenter's hammer"),
          (default,"rocks","box of assorted rocks"),
          (default,"jacket","water resistent black wind breaker"),
          (default,"spare tire","24 inch spare tire");

          ClickHouse 建表语句

            drop table test.test_orders;
            CREATE TABLE test.test_orders (
            order_id Int64 NOT NULL ,
            order_date DATETIME NOT NULL,
            customer_name String NOT NULL,
            price DECIMAL(10, 5) NOT NULL,
            product_id Int64 NOT NULL,
            order_status BOOLEAN NOT NULL -- Whether order has been placed
            )
            ENGINE = MergeTree()
            ORDER BY order_id
            PRIMARY KEY order_id;




            六、Hive Catalog 作业


            创建作业脚本

              SET table.local-time-zone = Asia/Shanghai;
              SET execution.runtime-mode = streaming;
              SET execution.checkpointing.interval = 60000;
              SET execution.checkpointing.tolerable-failed-checkpoints = 10;
              SET execution.checkpointing.timeout =10000;
              SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
              SET execution.checkpointing.mode = EXACTLY_ONCE;
              SET execution.checkpointing.unaligned = true;
              SET restart-strategy = fixed-delay;
              SET restart-strategy.fixed-delay.attempts = 5 ;
              SET restart-strategy.fixed-delay.delay = 30s;
              SET table.exec.source.cdc-events-duplicate = true;
              SET table.sql-dialect = default;
              SET pipeline.name = hive_catalog_cdc_orders;
              SET jobmanager.memory.process.size = 1600m;
              SET taskmanager.memory.process.size = 1780m;
              SET taskmanager.memory.managed.size = 512m;
              SET taskmanager.numberOfTaskSlots=2;
              SET yarn.application.queue= root.users.flink;
              LOAD MODULE hive WITH ('hive-version' = '2.1.1');
              CREATE CATALOG qhc_ods_catalog WITH (
              'type' = 'hive',
              'default-database' = 'default',
              'hive-version' = '2.1.1',
              'hive-conf-dir' = '/etc/hive/conf',
              'hadoop-conf-dir' = '/etc/hadoop/conf'
              );


              DROP TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src;
              CREATE TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src (
              `order_id` int COMMENT ''
              , `order_date` timestamp(3) COMMENT ''
              , `customer_name` string COMMENT ''
              , `price` decimal(12,2) COMMENT ''
              , `product_id` int COMMENT ''
              , `order_status` tinyint COMMENT ''
              ,PRIMARY KEY(order_id) NOT ENFORCED
              ) COMMENT ''
              WITH (
              'connector' = 'mysql-cdc'
              ,'hostname' = '192.168.0.4'
              ,'port' = '3306'
              ,'username' = 'root'
              ,'password' = '123456'
              ,'server-time-zone' = 'Asia/Shanghai'
              ,'scan.incremental.snapshot.enabled' = 'true'
              ,'scan.startup.mode'='initial'
              ,'scan.incremental.snapshot.chunk.size' = '20000'
              ,'heartbeat.interval' = '120s'
              ,'database-name' = 'bigdata'
              ,'table-name' = 'orders'
              );


              DROP TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink;
              CREATE TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink (
              `order_id` BIGINT COMMENT ''
              , `order_date` timestamp(3) COMMENT ''
              , `customer_name` string COMMENT ''
              , `price` decimal(12,5) COMMENT ''
              , `product_id` BIGINT COMMENT ''
              , `order_status` tinyint COMMENT ''
              ,PRIMARY KEY(order_id) NOT ENFORCED
              ) COMMENT ''
              WITH (
              'connector' = 'clickhouse',
              'url' = 'clickhouse://192.168.0.5:8123',
              'username' = 'default',
              'password' = '123456',
              'database-name' = 'test',
              'table-name' = 'test_orders',
              'sink.batch-size' = '500',
              'sink.flush-interval' = '1000',
              'sink.max-retries' = '3'
              );


              INSERT INTO qhc_ods_catalog.qhc_ods.ods_orders_sink
              SELECT * FROM qhc_ods_catalog.qhc_ods.ods_orders_src;


              提交 Flink 作业

                  如下,Flink 任务正常运行。


              查看 ClickHouse




              七、Dinky MySQL Catalog 作业


              创建作业脚本

                DROP  TABLE IF EXISTS ods_orders_src;
                CREATE TABLE IF NOT EXISTS ods_orders_src (
                `order_id` int COMMENT ''
                , `order_date` timestamp(3) COMMENT ''
                , `customer_name` string COMMENT ''
                , `price` decimal(12,2) COMMENT ''
                , `product_id` int COMMENT ''
                , `order_status` tinyint COMMENT ''
                ,PRIMARY KEY(order_id) NOT ENFORCED
                ) COMMENT ''
                WITH (
                'connector' = 'mysql-cdc'
                ,'hostname' = '192.168.0.4'
                ,'port' = '3306'
                ,'username' = 'root'
                ,'password' = '123456'
                ,'server-time-zone' = 'Asia/Shanghai'
                ,'scan.incremental.snapshot.enabled' = 'true'
                ,'scan.startup.mode'='initial'
                ,'scan.incremental.snapshot.chunk.size' = '20000'
                ,'heartbeat.interval' = '120s'
                ,'database-name' = 'bigdata'
                ,'table-name' = 'orders'
                );


                DROP TABLE IF EXISTS ods_orders_sink;
                CREATE TABLE IF NOT EXISTS ods_orders_sink (
                `order_id` BIGINT COMMENT ''
                , `order_date` timestamp(3) COMMENT ''
                , `customer_name` string COMMENT ''
                , `price` decimal(12,5) COMMENT ''
                , `product_id` BIGINT COMMENT ''
                , `order_status` tinyint COMMENT ''
                ,PRIMARY KEY(order_id) NOT ENFORCED
                ) COMMENT ''
                WITH (
                'connector' = 'clickhouse',
                'url' = 'clickhouse://192.168.0.5:8123',
                'username' = 'default',
                'password' = '123456',
                'database-name' = 'test',
                'table-name' = 'test_orders',
                'sink.batch-size' = '500',
                'sink.flush-interval' = '1000',
                'sink.max-retries' = '3'
                );


                创建初始化作业脚本

                    执行作业后,在 dinky 元数据库查询是否表已经存在。


                查看元数据表

                    每执行一次初始化DDL,将会更新 Flink 的元数据。通过左侧的结构可以看到catalog的表、view、udf等信息。


                提交 Flink 作业

                    在mysql_catalog_cdc_orders作业中,使用 insert 语句。将数据写入 ClickHouse 中。

                  SET table.local-time-zone = Asia/Shanghai;
                  SET execution.runtime-mode = streaming;
                  SET execution.checkpointing.interval = 60000;
                  SET execution.checkpointing.tolerable-failed-checkpoints = 10;
                  SET execution.checkpointing.timeout =10000;
                  SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
                  SET execution.checkpointing.mode = EXACTLY_ONCE;
                  SET execution.checkpointing.unaligned = true;
                  SET restart-strategy = fixed-delay;
                  SET restart-strategy.fixed-delay.attempts = 5 ;
                  SET restart-strategy.fixed-delay.delay = 30s;
                  SET table.exec.source.cdc-events-duplicate = true;
                  SET table.sql-dialect = default;
                  SET pipeline.name = mysql_catalog_cdc_orders;
                  SET jobmanager.memory.process.size = 1600m;
                  SET taskmanager.memory.process.size = 1780m;
                  SET taskmanager.memory.managed.size = 512m;
                  SET taskmanager.numberOfTaskSlots=2;
                  SET yarn.application.queue= root.users.flink;


                  INSERT INTO ods_orders_sink
                  SELECT * FROM ods_orders_src;


                  源库插入数据

                    INSERT INTO bigdata.orders
                    VALUES (default, '2020-07-30 12:12:30', 'lucy', 25.25, 10000, true);


                    查看 ClickHouse




                    八、总结

                        此实践分别通过 Flink 原生的 Hive Catalog 和 Dinky 中的 MySQL Catalog 将 MySQL 源库中的数据由 Flink CDC 同步到 ClickHouse 的本地表。以此可以将 Flink 实时任务和 ClickHouse 的 ETL 任务由 Dinky 实时计算平台统一处理完成。





                    交流

                    欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

                    QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。

                    微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。

                    钉钉社区群(推荐):

                           公众号:DataLink数据中台



                    扫描二维码获取

                    更多精彩

                    DataLink

                    数据中台




                    文章转载自Dinky开源,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论