暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一文掌握教育行业FlinkCDC+Paimon实时湖仓案例实践总结

大数据从业者 2024-10-14
379

前言

本文记录某高校客户的实时湖仓案例实践总结。业务数据库为MySQL,最终方案为很经典的流式架构:Mysql -> FlinkCDC -> Paimon -> FlinkSQL。组件版本信息如下:

MySQL

5.7.36

FlinkCDC

3.2.0

Paimon

0.9.0

Flink

1.18.1

欢迎关注微信公众号:大数据从业者

Paimon支持以多种形式FlinkCDC实时导入源端数据与元数据变更(schema evolution)到Paimon表中。也就是说源端增加列、不用重启Flink作业、可以自动识别实时导入到Paimon表。目前,Paimon支持的CDC形式包括:Mysql、Postgres、Kafka、Mongo、Pulsar。

举例说明:将Mysql表tableA导入到Paimon,两种方式:FlinkSQL或者MySqlSyncTableAction。注意:目前只有MySqlSyncTableAction支持schema evolution。使用FlinkSQL不能将Mysql表新增字段field_4同步到Paimon,如图所示:

   

使用MySqlSyncTableAction能够将Mysql表新增字段field_4同步到Paimon,如图所示:

Schema Evolution有限支持如下:

    1. Adding columns.
    2. Altering column types. More specifically:    
         altering from a string type (char, varchar, text) to another string type with longer length,
         altering from a binary type (binary, varbinary, blobto another binary type with longer length,
         altering from an integer type (tinyintsmallintintbigintto another integer type with wider range,
    altering from a floating-point type (float, double) to another floating-point type with wider range,

    Flink编译部署  

    Flink源码编译、整合Hadoop与Hive,参见历史文章,这里不再赘述。Flink部署路径:/home/myHadoopCluster/flink-1.18.1

    FlinkCDC编译部署

      git clone -b release-3.2.0 https://github.com/apache/flink-cdc.git
      mvn clean package –DskipTests

      拷贝flink-sql-connector-mysql-cdc-3.2.0.jar到Flink部署路径

        cp flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-3.2.0.jar home/myHadoopCluster/flink-1.18.1/lib/

        Paimon编译部署   

          git clone -b release-0.9.0 https://github.com/apache/paimon.git    
          mvn clean package –DskipTests

          拷贝paimon-flink-1.18-0.9.0.jar到Flink部署路径

            cp paimon-flink/paimon-flink-1.18/target/paimon-flink-1.18-0.9.0.jar home/myHadoopCluster/flink-1.18.1/lib/

            MySQL   

            设置binlog格式,重启MySQL。

              vi etc/my.cnf
              binlog_format=ROW

              创建示例数据库felixzh_db和数据表felixzh_user

                create database felixzh_db;
                create table felixzh_db.felixzh_user(id int auto_increment primary key, name varchar(10), salary decimal(10,2));          

                Flink SQL方式实践总结  

                1. 通过sql-client创建示例catalog、mysql表、paimon表,如下:

                  CREATE CATALOG paimon_catalog WITH (
                      'type' = 'paimon',
                      'warehouse' = 'hdfs:///flink/paimon'
                  );


                  use catalog paimon_catalog;


                  CREATE TABLE paimon_users (
                      id BIGINT,
                      name STRING,
                      salary DOUBLE,
                      PRIMARY KEY (idNOT ENFORCED
                  );


                  CREATE TEMPORARY TABLE users (
                       id INT,
                       name STRING,
                       salary DECIMAL(102),
                       PRIMARY KEY(idNOT ENFORCED
                       ) WITH (
                       'connector' = 'mysql-cdc',
                       'hostname' = 'felixzh',
                       'port' = '3306',
                       'username' = 'root',
                       'password' = '123456',
                       'database-name' = 'felixzh_db',
                  'table-name' = 'felixzh_user');

                  2. 提交SQL作业    

                    insert into paimon_users select * from users;

                    3. Flink OLAP查询

                      SET 'sql-client.execution.result-mode' = 'tableau';
                      RESET 'execution.checkpointing.interval';
                      SET 'execution.runtime-mode' = 'batch';

                      4.1 Mysql表insert数据、FlinkSQL查看Paimon表

                        insert into felixzh_user values(1, 'felixzh', '3.2');

                        结论:insert数据同步正常。

                        4.2 Mysql表update数据、FlinkSQL查看Paimon表

                          update felixzh_user set salary='3.5';

                          结论:update数据同步正常。

                          4.3 Mysql表alter增加列、FlinkSQL查看Paimon表

                            alter table felixzh_user add phone varchar(20);

                            结论:alter增加列数据未同步!

                            4.4 Mysql表delete数据、FlinkSQL查看Paimon表

                              delete from felixzh_user;

                              结论:delete数据同步正常。

                              MySqlSyncTableAction方式实践  

                              1. 将paimon-flink-action相关jar拷贝到Flink部署路径

                                cp paimon-flink/paimon-flink-action/target/paimon-flink-action-0.9.0.jar /home/myHadoopCluster/flink-1.18.1/          

                                2. 提交paimon-flink-action作业,如下:

                                  ./flink run -yid application_1726125159688_0006 ../paimon-flink-action-0.9.0.jar mysql_sync_table --warehouse hdfs:///flink/paimon1/ --database  felixzh_db --table felixzh_user --primary_keys id --mysql_conf hostname=felixzh --mysql_conf username=root --mysql_conf password=123456 --mysql_conf database-name=felixzh_db  --mysql_conf table-name=felixzh_user

                                  3. 通过sql-client创建上述Paimon表对应catalog

                                    CREATE CATALOG paimon_catalog1 WITH (
                                        'type' = 'paimon',
                                        'warehouse' = 'hdfs:///flink/paimon1'
                                    );

                                    注意:catalog warehouse必须与Paimon表存储路径相同。

                                      use catalog paimon_catalog1;

                                      虽然show tables看不到库表,不影响直接查询表,如下:

                                        select * from felixzh_db.felixzh_user;

                                        4.1 Mysql表insert数据、FlinkSQL查看Paimon表

                                          insert into felixzh_user values(10, 'felixzh', '10.9');

                                          4.2 Mysql表update数据、FlinkSQL查看Paimon表

                                            update felixzh_user set salary='20.1' where id=10;

                                            4.3 Mysql表add column、FlinkSQL查看Paimon表

                                              alter table felixzh_user add phone varchar(20);

                                              4.4 Mysql表insert数据(新加的列)、FlinkSQL查看Paimon表

                                                insert into felixzh_user values(11, 'felixzh', '11.1', '110');

                                                4.5 Mysql表delete部分数据、FlinkSQL查看Paimon表

                                                  delete from felixzh_user where id<10;

                                                     

                                                  结论

                                                  推荐优先使用FlinkSQL,除非需要使用Schema Change Evolution。

                                                  文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                  评论