暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL实战 — 数据库的结合应用

2225
本篇是Flink SQL实战系列文章的第四篇,主要介绍JDBC connector和mysql-cdc与Flink SQL的结合使用。数据库通常作为生产环境数据源的核心存储,Flink SQL提供了JDBC connector通过JDBC协议来支持数据库的连接,只要兼容JDBC协议的数据库都可以通过该方式,包括MySQL/PostgreSQL/TiDB等。另外,基于数据库的CDC (Change Data Capture)在实际生产中也发挥着重要的作用,Flink SQL提供了mysql-cdc connector 用来追踪MySQL数据库表的增删改操作。作为Flink SQL的核心开发者云邪和雪尽等大佬也写了不少文章来介绍jdbc和cdc connector以及changlog背后的原理,大家可以根据文章末尾的链接来仔细阅读,然后动手实践加深理解。本文中将会基于jdbc和mysql-cdc这两个connector在Flink SQL的实战,介绍的内容如下所示:


  • 数据准备

  • MySQL作为Flink计算的数据源

  • MySQL作为Flink计算的结果存储

  • TiDB在Flink SQL中的使用

  • CDC在Flink SQL中的使用

  • 总结



01 数据准备


本篇的数据依旧沿用第二篇中创建好的数据,对于jdbc connector的实战,继续计算点击流写到MySQL和TiDB;对于mysql-cdc connector,则需要临时创建table 和以及增删改数据,具体流程放在讲述cdc小节里面。


02 MySQL作为Flink计算的数据源


Flink SQL实战的第二篇文章介绍过MySQL 作为维度表进行temporal table join的应用,这里不再赘述;本小节主要介绍在MySQL作为source的时候,对scan.* 参数的使用进行探索,这四个参数如下所示:


    'scan.partition.column'
    'scan.partition.num'
    'scan.partition.lower-bound'
    'scan.partition.upper-bound'
    • 我们根据Flink SQL实战第二篇的广告位, 创建未配置scan相关参数的Flink MySQL table ,详细语句如下所示:

      # 创建 Flink MySQl 表, 未配置scan相关的参数
      CREATE TABLE check_mysql_scan (
      ID BIGINT,
      name STRING
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'url',
      'table-name' = 'adspace',
      'username' = 'username',
      'password' = 'password'
      );
      # 提交查询, 通过jobGraph,来验证数据
      select MAX(ID) from check_mysql_scan;




      • 我们根据Flink SQL实战第二篇的广告位, 创建配置scan相关参数的Flink MySQL table ,详细语句如下所示:


        # 创建 Flink MySQl 表, 配置scan相关的参数


        CREATE TABLE check_mysql_scan (ID BIGINT, name STRING) WITH (
        'scan.partition.column' = 'id',
        'scan.partition.num' = '3',
        'scan.partition.lower-bound' = '20',
        'scan.partition.upper-bound' = '100',
        'connector' = 'jdbc',
        'url' = 'url',
        'table-name' = 'adspace',
        'username' = 'username',
        'password' = 'password'
        );
        # 提交查询, 通过jobGraph,来验证数据
        select MAX(ID) from check_mysql_scan ;




        • 关于Flink SQL JDBC connector的partition相关参数的相关结论如下所示: 

         

            'scan.partition.column' = 'id',
          'scan.partition.num' = '3',
          'scan.partition.lower-bound' = '20',
          'scan.partition.upper-bound' = '100',
          1. 上述4个参数是MySQL表作为source表的选填参数,当MySQL表数据量很小的时候,可以不用配置;当数据量大的时候,可以同时配置这四个参数来提高读取效率(必须同时填写这4个 参数)

          2. 填写这四个参数相当于做了优化方案, 将一个slot上跑的task执行查询任务(就算查询SQL job的并行度大于1,对于sourc算子来说,只有一个slot有用,也就说单个slot上的task扫描全表,当数据量大的时候,task很容易失败,重启也解决不了这个问题)基于上下界和分区个数,形成 多个并行查询,大大提高了效率。对于上述参数,实际上拆成三个查询如下所示:

            select * from check_mysql_scan where id between 20 and 46;
            select * from check_mysql_scan where id between 47 and 73;
            select * from check_mysql_scan where id between 74 and 100;
            1. 如何配置这个4个参数:

                    scan.partition.column:作为分区的字段名,建议使用在MySQL中定义为索引的字段作为分区

                    scan.partition.num:分区个数,几个分区对应着几个子查询,一般来说这参数大于1的,那么后续使用该source表的SQL job的并行度最好和这个分区数相等

                    scan.partition.lower-bound:查询数据范围的下限

                    scan.partition.upper-bound:查询数据的上限

            1. 对于Flink SQL 读取MySQL的说明:

              1. 没有添加上述4个分区相关的字段的场景,只有一个slot上的source task去读取MySQL中的数据,不管Job 的并行度是多少,只有一个task去消费,也就是说类比Flink 的soure task 去消费并行度大于分区数的kafka topic,存在task空转的情况, 浪费计算资源

              2. 没有添加上述4个分区相关的字段的场景,query SQL就算有where条件,依旧是全表扫描

              3. 添加上述4个分区相关的字段的场景,Flink 自身才会基于scan.partition.column的lower-bound和upper-bound生产where条件,从而形成谓词下推,避免全表扫描


            03 MySQL作为Flink计算的结果存储


            MySQL 作为使用JDBC接口最为人熟知的数据库之一,在各个公司里面使用非常广泛。在实时场景中有很多应用,比如作为实时报表的存储供前端应用来查询,比如实时广告位的top点击数可视化,帮助产品运营优化广告投放策略以及机器学习模型的效果验证等。第二篇中创建好的点击流,我们对点击流做个1分钟的翻转窗口计算分广告位的点击数,通过Flink SQL 提供的JDBC connector 写入MySQL表中,再通过Superset进行可视化以及查询。

            • 首先,需要在MySQL中创建物理表,作为数据真正存储的地方;然后,需要创建Flink MySQL table,通过Flink SQL提供的JDBC connector连接起来;最后,对点击流做个1分钟的翻转窗口计算分广告位的点击数编写SQL job 逻辑;具体的SQL语句如下所示:

              # 创建mysql 物理表
              CREATE TABLE `click_report` (
              adspace_id_str varchar(32),
              adspace_id int,
              window_start_time BIGINT,
              click_count bigint(20) DEFAULT NULL,
              PRIMARY KEY (`adspace_id_str`)
              ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
              # 注册 Flink MySQL table




              CREATE TABLE dwa_mysql_click_report (
              adspace_id_str STRING,
              adspace_id int,
              window_start_time BIGINT,
              click_count BIGINT,
              PRIMARY KEY (adspace_id_str) NOT ENFORCED
              ) WITH (
              'connector' = 'jdbc',
              'url' = 'url',
              'driver' = 'com.mysql.jdbc.Driver',
              'table-name' = 'click_report',
              'username' = 'root',
              'password' = '123456',
              'sink.buffer-flush.max-rows' = '10',
              'sink.buffer-flush.interval' = '2s',
              'sink.max-retries' = '2'
              );
              # 窗口计算逻辑,写入MySQL
              INSERT INTO dwa_mysql_click_report
              SELECT
              CONCAT(CAST(publisher_adspace_adspaceId AS STRING) ,
              '_',
              CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
              ),
              publisher_adspace_adspaceId,
              UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000,
              COUNT(audience_behavior_click_creative_impressionId)
              FROM
              adsdw_dwd_max_click_mobileapp
              WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
              GROUP BY
              TUMBLE(ets, INTERVAL '1' MINUTE),
              publisher_adspace_adspaceId;
              • 在MySQL中创建物理表以及数据查询验证,在Flink SQL client中创建 Flink MySQL table 以及提交SQL job,以及提交Flink Cluster上的job状态如下所示: 




              • Flink SQL 将计算结果写到MySQL中,后续需要查询或者绘制实时大盘。Airbnb 开源的Superset是优秀的可视化工具代表之一,在我们的生产中Superset已经可视化了MySQL/Hive(Presto + Alluxio来query数据)/druid中的数据,我们可以Superset 的SQL lab查询MySQL中的数据,如下所示:



              强大的superset还支持绘制chart和dashboard,我们可以绘制chart放在dashboard上,如下图所示:



              04 TiDB在Flink SQL中的使用


              数据库领域一直竞争激烈,适合各种场景、各种类型的数据库层出不穷,数据库也是云厂商争抢的细分领域之一,TiDB作为数据库领域新贵,广受好评。根据TiDB官网的介绍,TiDB 是 PingCAP 公司自主设计、研发的开源分布式关系型数据库,是一款同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分布式数据库产品,具备水平扩容或者缩容、金融级高可用、实时 HTAP、云原生的分布式数据库、兼容 MySQL 5.7 协议和 MySQL 生态等重要特性。目前,TiDB在我们公司已经大规模部署,很多核心业务已经从MySQL迁移到TiDB上了,在本文末尾也有介绍TiDB在360的使用的链接。在本文中,我们使用JDBC connector来验证Flink SQL做计算写TiDB的场景,由于TiDB完全兼容MySQL 5.7协议, 因此,我们创建同样TiDB 表、Flink TiDB 表以及做同样的计算逻辑写入到TiDB,只有TiDB的连接信息稍作改动。

              • 首先,需要在TiDB中创建物理表,作为数据真正存储的地方;然后,需要创建Flink TiDB table,通过Flink SQL提供的JDBC connector连接起来,只有这里JDBC的连接信息与MySQL不一样;最后,对点击流做个1分钟的翻转窗口计算分广告位的点击数编写SQL job 逻辑;具体的SQL语句如下所示:


                # 创建TiDB 物理表
                CREATE TABLE `click_report_tidb` (
                adspace_id_str varchar(32),
                adspace_id int,
                window_start_time BIGINT,
                click_count bigint(20) DEFAULT NULL,
                PRIMARY KEY (`adspace_id_str`)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
                # 注册 Flink TiDB table
                CREATE TABLE dwa_tidb_click_report (
                adspace_id_str STRING,
                adspace_id int,
                window_start_time BIGINT,
                click_count BIGINT,
                PRIMARY KEY (adspace_id_str) NOT ENFORCED
                ) WITH (
                'connector' = 'jdbc',
                'url' = 'url',
                'driver' = 'com.mysql.jdbc.Driver',
                'table-name' = 'click_report_tidb',
                'username' = 'username',
                'password' = 'password',
                'sink.buffer-flush.max-rows' = '10',
                'sink.buffer-flush.interval' = '2s',
                'sink.max-retries' = '2'
                );
                # 窗口计算逻辑,写入TiDB
                INSERT INTO dwa_tidb_click_report
                SELECT
                CONCAT(CAST(publisher_adspace_adspaceId AS STRING) ,
                '_',
                CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
                ),
                publisher_adspace_adspaceId,
                UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000,
                COUNT(audience_behavior_click_creative_impressionId)
                FROM
                adsdw_dwd_max_click_mobileapp
                WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
                GROUP BY
                TUMBLE(ets, INTERVAL '1' MINUTE),
                publisher_adspace_adspaceId;


                • 在TiDB中创建物理表以及数据查询验证,在Flink SQL client中创建 Flink MySQL table 以及提交SQL job,以及提交Flink cluster上的job状态如下所示:





                • 数据可视化同上,依旧使用Superset 查询和dashboard展示,具体效果如下所示:





                05 CDC在Flink SQL中的使用


                CDC在我们实际生产中使用的也非常广泛,如存储在MySQL中的物料表的实时更新操作同步到Kafka中,可供广告投放引擎消费来及时获得最新的物料数据,也可参与到实时报表计算中。目前的方案是通过MaxWell读取MySQL的binlogs 以json格式写到Kafka;所以MaxWell的稳定可靠性非常重要,虽说已经对MaxWell进行容器化调度了,HA做的比较好,但是从整个pipeline上看,还是过于依赖MaxWell服务。Flink SQL 的mysql-cdc这个connector ,天然的集成了CDC(采用内嵌的Debezium)帮助用户实时追踪MySQL的增删改操作,Flink SQL内部提供changelog机制,通过changelog-json format写到Kafka中,这样就能够将CDC流的处理在Flink这一套框架内完成,不需要单独部署MaxWell。

                • 首先,我们先建一个MySQL 表,以及注册Flink MySQL table,对MySQL表进行增删改操作,并且在sql-client 实时print 结果;

                  # 创建MySQL table
                  create database demo;
                  use demo;
                  show tables;
                  CREATE TABLE `test` (
                  `id` bigint(20) NOT NULL AUTO_INCREMENT,
                  `age` int(11) DEFAULT NULL,
                  `name` varchar(255) DEFAULT NULL,
                  PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
                  select * from test;
                  # 对MySQL表进行增删改操作
                  insert into test values(1,11,"zhoujielun");
                  insert into test values(2,22,"12dss");
                  insert into test values(3,33,"asdad");
                  insert into test values(4,44,"linghuchong");
                  update test set name = 'change_name' where id = 2;
                  delete from test where id=1;
                  #注册Flink MySQL table




                  CREATE TABLE cdc(
                  id BIGINT,
                  age INT,
                  name STRING
                  ) WITH (
                  'connector' = 'mysql-cdc',
                  'hostname' = 'hostname',
                  'port' = '3309',
                  'username' = 'root',
                  'password' = '123456',
                  'database-name' = 'demo',
                  'table-name' = 'test'
                  );




                  在实际场景中,通过mysql-cdc conenctor一定是流到某个存储中,可以是保存在Kafka,也可以保存ES中(放在后续章节中讲述),然后供下游使用。由于Kafka 设计的原因,不支持撤回流,直接用Kafka connector 搭配json format这种方式是无法写入Kafka中的,好在Flink  SQL同时也提供了changelog-json format,通过Kafka connector 和changelog-json format的组合将数据写到Kafka中;

                  • 因此,我们只需要注册Flink Kafka table,使用changelog-json format即可,具体的建表语句如下所示:

                    # 注册Flinl Kafka table
                    CREATE TABLE flink_rtdw.demo.kafka_changelog_table (
                    id BIGINT,
                    age INT,
                    name STRING
                    ) WITH (
                    'connector' = 'kafka',
                    'topic' = 'changelog_json_format_topic',
                    'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,3:9092',
                    'format' = 'changelog-json'
                    );
                    # 将changelog 写到kafka中
                    INSERT INTO kafka_changelog_table
                    select id, age,name
                    from cdc


                    • 在sql-client中创建Flink kafka table,以及提交insert job 如下所示:




                    通过mysql-cdc以及changelog-json抽取数据到Kafka的整个链接跑起来,那么我们来更新数据做下验证,更新语句操作已经查询结果:
                      # 在MySQL中执行如下操作
                      delete from test where id = 2;
                      update test set name = 'changeling_json_kafka' where id = 4;




                      06 总结

                         

                      本章主要介绍jdbc 和 mysql-cdc这两个connector在Flink SQL中的使用。Flink SQL的jdbc connector支持JDBC协议数据库的连接,本文重点介绍了Flink SQL读取MySQL时候关于scan参数的使用,也介绍了MySQL/TiDB作为Flink SQL计算结果存储的使用。mysql-cdc的功能非常强大,使得Flink对MySQL的CDC流得到原生的支持,可以结合其他的connector 在更多场景中进行探索使用。


                      参考:



                      文章转载自数据架构那些事,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论