暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dinky 扩展 ChunJun 的实践分享

Dinky开源 2022-07-08
248
摘要:本文介绍了 Dinky 实时计算平台扩展 ChunJun 的实践分享。内容包括:
  1. 前言
  2. 部署 Flink+ChunJun
  3. 部署 Dinky
  4. 示例分享
  5. 总结
  6. 用户体验


Tips:历史传送门
Dinky 开源一周年了~
Dinky 扩展 iceberg 的实践分享
Dinky 扩展 kudu 实践分享
Dinky 构建 Flink CDC 整库入仓入湖
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~


一、前言

ChunJun(原FlinkX)是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具,既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 binlog,Kafka等。同时 ChunJun 也是一个支持原生 FlinkSql所有语法和特性的计算框架。

ChunJun 具有丰富的插件种类,多达40种,如常见的 mysql、binlog、logminer 等,大部分插件都支持 source/reader、sink/writer 及维表功能。目前很多用户在思考能否在 Dinky 上使用 ChunJun 的插件以提供更全面的能力。那本文将带来如何在 Dinky 上集成 ChunJun 丰富的插件,其实简单,那我们开始吧。


二、部署 Flink+ChunJun

编译

    注意,如果需要集成 Dinky,需要将 ChunJun项目下的 chunjun-core 的pom 文件中的 logback-classic 和 logback-core 注释掉,否则容易在 Dinky 执行 sql 任务的时候报错。

    <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.7</version>
    </dependency>




    <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.1.7</version>
    </dependency

            然后执行:

      mvn clean package  -DskipTests

      部署

      使用 ChunJun 需要先部署 Flink 集群,其部署本文不再做指导。

      值得注意的是,如果你需要调用 Flinkx 的 connect jar 的话,则需要将 classloader.resolve-order 改成 parent-first。修改完成配置以后,把 Flinkx 的 jar 包复制过来,主要是 chunjun-clients-master.jar(Flinkx 现在改名 ChunJun )以及 chunjun 的其它 connector 放到 flink/lib 目录下,如图所示。


      异常处理

      如果启动集群时出现异常,即 Flink standalone 集群加载 flinkx-dist 里 jar 包之后,集群无法启动,日志报错:Exception in thread "main" java.lang.NoSuchFieldError: EMPTY_BYTE_ARRAY.

        Exception in thread"main"java.lang.NoSuchFieldError:EMPTY_BYTE_ARRAY
        at org.apache.logging.log4j.core.config.ConfigurationSource.<clinit>(ConfigurationSource.java:56)
        at org.apache.logging.log4j.core.config.NullConfiguration.<init>(NullConfiguration.java:32)
        at org.apache.logging.log4j.core.LoggerContext.<clinit>(LoggerContext.java:85)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.log4j.LogManager.<clinit>(LogManager.java:72)
        at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.<clinit>(ClusterEntrypoint.java:107)

        原因:这个报错是因为 log4j 版本不统一导致的,因为 flinkx-dist 中部分插件引用的还是旧版本的 log4j 依赖,导致集群启动过程中,出现了类冲突问题;

        方案:临时方案是将 flink lib 中 log4j 相关的jar包名字前加上字符 ‘a‘,使得flink standalone jvm 优先加载。




        三、部署 Dinky


        编译

          git clone https://github.com/DataLinkDC/dlink.git
          cd dlink
          mvn clean install -Dmaven.test.skip=true

          编译完成后的压缩包在 Dinky 根目录下得 build 文件夹下。

          部署

          1、上传dlink压缩包到部署服务器

          2、解压

            tar -zxvf dlink-release-0.6.4.tar.gz
            mv dlink-release-0.6.4 dlink0.6.4
            cd dlink0.6.4

            3、数据库初始化

            4、把 flink 的 jar 放到 dlink 目录下

              cd dlink0.6.4
              mkdir plugins
              cp -r opt/flink/lib/flink-*.jar .

              切换 Dinky 的 Flink 版本

              因为目前 flinkx 的稳定版本是 1.12.7,所以我们把 dlink 默认的 client 版本修改为 1.12

                cd dlink0.6.4/lib
                rm -rf dlink-client-*.jar
                cp -r ../extends/dlink-client-1.12-0.6.4.jar .

                lib下的目录如图:

                注意:因为我没有用上 dlink-connector-jdbc 的 jar 包,所以图中的 dlink-connector-jdbc-1.13-0.6.4-SNAPSHOT.jar 没有换成1.12版本的,可以去掉。

                启动

                启动命令

                  sh auto.sh start

                  注册集群实例

                      在集群实例中注册已经启动的 Flink 集群。


                  四、示例分享


                  添加依赖

                      这里演示 mysql->mysql 的同步作业,所以需要 Flinkx 的 mysql-connector.jar 以及核心 jar。


                  编写作业

                  Mysql   DDL:

                    CREATE TABLE `datasource_classify` (
                    `id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
                    `classify_code` varchar(64) NOT NULL COMMENT '类型栏唯一编码',
                    `sorted` int NOT NULL DEFAULT '0' COMMENT '类型栏排序字段 默认从0开始',
                    `classify_name` varchar(64) NOT NULL COMMENT '类型名称 包含全部和常用栏',
                    `is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否删除,1删除,0未删除',
                    `gmt_create` datetime DEFAULT CURRENT_TIMESTAMP,
                    `gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    PRIMARY KEY (`id`),
                    UNIQUE KEY `classify_code` (`classify_code`)
                    ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='数据源分类表';

                    Flink  Sql:

                      CREATE TABLE source
                      (
                      id bigint,
                      classify_code STRING,
                      sorted int,
                      classify_name STRING,
                      is_deleted int,
                      gmt_create timestamp(9),
                      gmt_modified timestamp(9),
                      PRIMARY KEY (id) NOT ENFORCED
                      ) WITH (
                      'connector' = 'mysql-x',
                      'url' = 'jdbc:mysql://192.168.31.101:3306/datasource?useSSL=false',
                      'table-name' = 'datasource_classify',
                      'username' = 'root',
                      'password' = 'root'
                      ,'scan.fetch-size' = '2'
                      ,'scan.query-timeout' = '10'
                      );


                      CREATE TABLE sink
                      (
                      id bigint,
                      classify_code STRING,
                      sorted int,
                      classify_name STRING,
                      is_deleted int,
                      gmt_create timestamp(9),
                      gmt_modified timestamp(9),
                      PRIMARY KEY (id) NOT ENFORCED
                      ) WITH (
                      'connector' = 'mysql-x',
                      'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
                      'table-name' = 'datasource_classify',
                      'username' = 'root',
                      'password' = 'root'
                      ,'scan.fetch-size' = '2'
                      ,'scan.query-timeout' = '10'
                      );


                      insert into sink
                      select *
                      from source u;

                      执行任务

                          选中 Yarn Session 模式提交作业。

                          提交后可从执行历史查看作业提交状况。

                          进程中可以看的 Flink 集群上批作业执行完成。

                      对比数据

                          源库:

                          目标库:

                          同步成功,很丝滑。



                      五、总结

                      在集成 ChunJun 的时候遇到的问题大部分都是缺包以及包冲突,所以只需要注意一下这个问题就能比较好的进行集成。

                      在集成服务的时候建议是,先把 Flink 和 ChunJun 进行集成,确保服务能够正常启用以后再进行 Dinky 的集成,这样有利于快速定位查找问题,如果遇到文章之外的问题,也可以查看 Dinky 官网FAQ | Dinky (dlink.top) chunjun的官网QuickStart | ChunJun 纯钧 (dtstack.github.io),看看是否有类似问题的解决办法作为参考。


                      六、用户体验

                      因为本人目前还是处于学习使用的过程中,所以很多功能没有好好使用,待自己研究更加透彻后希望写一篇文章,优化官网的用户手册。以下的优缺点以及建议都是目前我在使用学习的过程中遇到的问题。

                      优点:

                      Dinky 最吸引我的地方应该就是 sql 编辑模版了,直接快捷键生成 sql 模版,在开发测试中屡试不爽。在集成了 ChunJun(Flinkx) 以后,能够做到多源数据的离线跑批任务及日常小批量实时任务的同步。支持各种类型的任务执行方式。

                      缺点:

                      ui 上适配还有点小问题,例如:打开 F12 调整宽度后,再关闭,页面 ui 不会自适应,需要刷新。

                      期待改进点:

                      1、更多的自定义异常、业务异常

                      2、增加新的向导模式,结合数据源,通过 webUI 可以一键引入字段或者勾选需要的字段,生成 Flink Sql 的一大部分配置

                        CREATE TABLE 表名
                        (
                        -- 页面勾选字段,字段从元数据直接拉取
                        id bigint,
                        classify_code STRING,
                        sorted int,
                        classify_name STRING,
                        is_deleted int,
                        gmt_create timestamp(9),
                        gmt_modified timestamp(9),
                        PRIMARY KEY (id) NOT ENFORCED
                        ) WITH (
                        -- 从选择的数据中获取
                        'connector' = 'mysql-x',
                        'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
                        'table-name' = 'datasource_classify',
                        'username' = 'root',
                        'password' = 'root'
                        ,
                        -- 其它非主要配置有用户自己填写
                        );

                        3、sql 历史版本管理,目前我已经提交 Feature 并被合并到 0.6.5 版本中。




                        交流

                        欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

                        QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批

                        微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,缺一不批谢谢。

                               公众号:DataLink数据中台



                        扫描二维码获取

                        更多精彩

                        DataLink

                        数据中台




                        文章转载自Dinky开源,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论