暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink同步MySQL数据到Starrocks

skylines 2024-04-01
226

这里简单说一下使用Flink同步MySQL的数据到Starrocks数据库上,可以用以下的图可以了解大致的步骤和组件。

首先讲一下这个同步工具同步数据涉及到的组件,包括starrocks的SMT同步工具,还有flink工具,再加上连接源端和目标端的flink-cdc-connector,都是jar文件,分别是flink-sql-connector-mysql-cdc-3.0.1.jar和flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar。对于连接源端的connector,版本要求不高,但是对于连接目标端的starrocks的connector,则有相对严格的要求。

flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 会包含三个版本号:

  • 第一个版本号 x.x.x 为 flink-connector-starrocks 的版本号。

  • 第二个版本号 y.yy 为其支持的 Flink 版本号。

  • 第三个版本号 z.zz 为 Flink 支持的 Scala 版本号。如果 Flink 为 1.14.x 以及之前版本,则需要下载带有 Scala 版本号的 flink-connector-starrocks。


目标端MySQL数据库准备测试表和数据:

    --建表语句
    create table mytest_mssql_tab (id int primary key,name varchar(10),age int);
    --写入初始数据
    insert into mytest_mssql_tab values(1101,'suxing',25),(1102,'haha',20),(1103,'zaza',30);
    insert into mytest_mssql_tab values(1104,'susu',35),(1105,'kaka',40),(1106,'papa',45);
    --同步后继续写入新数据
    insert into mytest_mssql_tab values(1107,'zuzu',35),(1108,'mumu',40),(1109,'pupu',45);
    insert into mytest_mssql_tab values(1110,'coco',30);


    接着就大概讲一下配置过程:

    1、安装flink

    比如我采用的是flink-1.14.5-bin-scala_2.11.tar.gz这个版本的安装包。解压之后,按照默认配置直接启动flink就可以了。

      /usr/local/flink-1.14.5/bin/start-cluster.sh
        Starting cluster.
        Starting standalonesession daemon on host startestdb01.
        Starting taskexecutor daemon on host startestdb01.
        root@startestdb01:/usr/local/flink-1.14.5#
        root@startestdb01:/usr/local/flink-1.14.5#
        root@startestdb01:/usr/local/flink-1.14.5# ps -ef |grep flink
        root 48808 1 52 22:39 pts/0 00:00:06 usr/local/jdk-11.0.1/bin/java -Xmx234881024 -Xms234881024 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/usr/local/flink-1.14.5/log/flink-root-standalonesession-1-startestdb01.log -Dlog4j.configuration=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlogback.configurationFile=file:/usr/local/flink-1.14.5/conf/logback.xml -classpath usr/local/flink-1.14.5/lib/flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar:/usr/local/flink-1.14.5/lib/flink-csv-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-json-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-shaded-zookeeper-3.4.14.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-mysql-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-sqlserver-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-table_2.11-1.14.5.jar:/usr/local/flink-1.14.5/lib/log4j-1.2-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-core-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar:/usr/local/flink-1.14.5/lib/flink-dist_2.11-1.14.5.jar::: org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir usr/local/flink-1.14.5/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=234881024b -D jobmanager.memory.jvm-overhead.max=201326592b
        root 49078 1 59 22:39 pts/0 00:00:05 usr/local/jdk-11.0.1/bin/java -XX:+UseG1GC -Xmx145961776 -Xms145961776 -XX:MaxDirectMemorySize=201326592 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/usr/local/flink-1.14.5/log/flink-root-taskexecutor-0-startestdb01.log -Dlog4j.configuration=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlogback.configurationFile=file:/usr/local/flink-1.14.5/conf/logback.xml -classpath usr/local/flink-1.14.5/lib/flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar:/usr/local/flink-1.14.5/lib/flink-csv-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-json-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-shaded-zookeeper-3.4.14.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-mysql-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-sqlserver-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-table_2.11-1.14.5.jar:/usr/local/flink-1.14.5/lib/log4j-1.2-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-core-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar:/usr/local/flink-1.14.5/lib/flink-dist_2.11-1.14.5.jar::: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir usr/local/flink-1.14.5/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=231525584b -D taskmanager.memory.task.heap.size=11744048b -D taskmanager.numberOfTaskSlots=1 -D taskmanager.memory.jvm-overhead.max=201326592b
        root 49193 48439 0 22:39 pts/0 00:00:00 grep --color=auto flink
        root@startestdb01:/usr/local/flink-1.14.5#


        2、将两端flink-cdc-connector的jar包放到flink上面

          mv flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar /usr/local/flink-1.14.5/lib
          mv flink-sql-connector-mysql-cdc-3.0.1 usr/local/flink-1.14.5/lib


          ##需要重启flink
          /usr/local/flink-1.14.5/bin/stop-cluster.sh
          /usr/local/flink-1.14.5/bin/start-cluster.sh

          3、解压starrrocks的SMT到/usr/local/flink-1.14.5目录下

            tar -xvf smt.tar
            mv smt usr/local/flink-1.14.5

            4、编辑smt配置文件

              cd usr/local/flink-1.14.5/smt/conf
              vi config_prod.conf
              [db]
              host = 172.17.7.11
              port = 3306
              user = myrepl
              password = password
              # currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
              type = mysql
              # # only takes effect on `type == hive`.
              # # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
              # authentication = kerberos


              [other]
              # number of backends in StarRocks
              be_num = 2
              # `decimal_v3` is supported since StarRocks-1.8.1
              use_decimal_v3 = true
              # directory to save the converted DDL SQL
              output_dir = ./result




              # !!!`database` `table` `schema` are case sensitive in `oracle`!!!
              [table-rule.1]
              # pattern to match databases for setting properties
              # !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!
              database = mytest
              # pattern to match tables for setting properties
              table = mytest_mysql_tab
              # `schema` only takes effect on `postgresql` and `oracle` and `sqlserver`
              schema = ^.*$


              ############################################
              ### starrocks table configurations
              ############################################
              # # set a column as the partition_key
              # partition_key = p_key
              # # override the auto-generated partitions
              # partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day)
              # # only take effect on tables without primary keys or unique indexes
              # duplicate_keys=k1,k2
              # # override the auto-generated distributed keys
              # distributed_by=k1,k2
              # # override the auto-generated distributed buckets
              # bucket_num=32
              # # properties.xxxxx: properties used to create tables
              # properties.in_memory = false


              ############################################
              ### flink sink configurations
              ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
              ############################################
              flink.starrocks.jdbc-url=jdbc:mysql://172.17.199.36:9030
              flink.starrocks.load-url=172.17.199.36:8030
              flink.starrocks.username=root
              flink.starrocks.password=password
              flink.starrocks.sink.max-retries=10
              flink.starrocks.sink.buffer-flush.interval-ms=15000
              flink.starrocks.sink.properties.format=json
              flink.starrocks.sink.properties.strip_outer_array=true
              # # used to set the server-id for mysql-cdc jobs instead of using a random server-id
              # flink.cdc.server-id = 5000


              ############################################
              ### flink-cdc configuration for `tidb`
              ############################################
              # # Only takes effect on TiDB before v4.0.0.
              # # TiKV cluster's PD address.
              # flink.cdc.pd-addresses = 127.0.0.1:2379


              ############################################
              ### flink-cdc plugin configuration for `postgresql`
              ############################################
              # # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
              # # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
              # # and https://debezium.io/documentation/reference/postgres-plugins.html
              # flink.cdc.decoding.plugin.name = decoderbufs

              5、生成source table与sink table 的执行SQL

                cd usr/local/flink-1.14.5/smt
                ./starrocks-migrate-tool


                root@startestdb01:/usr/local/flink-1.14.5/smt# ls -lrt result_mysql
                total 24
                -rw-r--r-- 1 root root 412 Mar 28 15:09 starrocks-external-create.all.sql
                -rw-r--r-- 1 root root 412 Mar 28 15:09 starrocks-external-create.1.sql
                -rw-r--r-- 1 root root 315 Mar 28 15:09 starrocks-create.1.sql
                -rw-r--r-- 1 root root 1152 Mar 28 15:09 flink-create.all.sql
                -rw-r--r-- 1 root root 1152 Mar 28 15:09 flink-create.1.sql
                -rw-r--r-- 1 root root 305 Mar 28 15:11 starrocks-create.all.sql

                6、编辑starrocks-create.all.sql并导入starrocks数据库

                  --修改成适用于starrocks的建库和建表语句
                  cat >starrocks-create.all.sql
                  CREATE DATABASE IF NOT EXISTS `mytest`;


                  CREATE TABLE IF NOT EXISTS `mytest`.`mytest_mysql_tab` (
                  `id` INT NOT NULL COMMENT "",
                  `name` STRING NULL COMMENT "",
                  `age` INT NULL COMMENT ""
                  ) ENGINE=olap
                  PRIMARY KEY(`id`)
                  COMMENT ""
                  DISTRIBUTED BY HASH(`id`)
                  PROPERTIES (
                  "replication_num" = "2"
                  );




                  mysql -h127.0.0.1 -P9030 -uroot -p < starrocks-create.all.sql

                  7、编辑flink-create.all.sql并启动同步

                  flink-create.all.sql里面包含了source table 和sink table的表结构信息和源端与目标端的连接信息等,如下所示。

                    --调整source tablesink table 语句
                    cat flink-create.all.sql
                    CREATE DATABASE IF NOT EXISTS `default_catalog`.`mytest`;


                    CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_src` (
                    `id` INT NOT NULL,
                    `name` STRING NULL,
                    `age` INT NULL,
                    PRIMARY KEY(`id`)
                    NOT ENFORCED
                    ) with (
                    'database-name' = 'mytest',
                    'table-name' = 'mytest_mysql_tab',
                    'connector' = 'mysql-cdc',
                    'hostname' = '172.17.7.11',
                    'port' = '3306',
                    'username' = 'myrepl',
                    'password' = 'password'
                    );


                    CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_sink` (
                    `id` INT NOT NULL,
                    `name` STRING NULL,
                    `age` INT NULL,
                    PRIMARY KEY(`id`)
                    NOT ENFORCED
                    ) with (
                    'sink.properties.format' = 'json',
                    'load-url' = '172.17.199.36:8030',
                    'sink.properties.strip_outer_array' = 'true',
                    'connector' = 'starrocks',
                    'table-name' = 'mytest_mysql_tab',
                    'sink.buffer-flush.interval-ms' = '15000',
                    'password' = 'password',
                    'sink.max-retries' = '10',
                    'jdbc-url' = 'jdbc:mysql://172.17.199.36:9030',
                    'database-name' = 'mytest',
                    'username' = 'root'
                    );




                    --启动同步
                    root@startestdb01:/usr/local/flink-1.14.5# ./bin/sql-client.sh -f /usr/local/flink-1.14.5/smt/result/flink-create.all.sql
                    [INFO] Executing SQL from file.
                    Flink SQL> CREATE DATABASE IF NOT EXISTS `default_catalog`.`mytest`;
                    [INFO] Execute statement succeed.
                    Flink SQL>
                    CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_src` (
                    `id` INT NOT NULL,
                    `name` STRING NULL,
                    `age` INT NULL,
                    PRIMARY KEY(`id`)
                    NOT ENFORCED
                    ) with (
                    'password' = 'password',
                    'database-name' = 'mytest',
                    'table-name' = 'mytest_mysql_tab',
                    'connector' = 'mysql-cdc',
                    'hostname' = '172.17.7.11',
                    'port' = '3306',
                    'username' = 'myrepl'
                    );
                    [INFO] Execute statement succeed.
                    Flink SQL>
                    CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_sink` (
                    `id` INT NOT NULL,
                    `name` STRING NULL,
                    `age` INT NULL,
                    PRIMARY KEY(`id`)
                    NOT ENFORCED
                    ) with (
                    'jdbc-url' = 'jdbc:mysql://172.17.199.36:9030',
                    'username' = 'root',
                    'connector' = 'starrocks',
                    'table-name' = 'mytest_mysql_tab',
                    'sink.buffer-flush.interval-ms' = '15000',
                    'sink.properties.strip_outer_array' = 'true',
                    'load-url' = '172.17.199.36:8030',
                    'password' = 'password',
                    'sink.properties.format' = 'json',
                    'database-name' = 'mytest',
                    'sink.max-retries' = '10'
                    );
                    [INFO] Execute statement succeed.
                    Flink SQL>
                    INSERT INTO `default_catalog`.`mytest`.`mytest_mysql_tab_sink` SELECT * FROM `default_catalog`.`mytest`.`mytest_mysql_tab_src`;
                    [INFO] Submitting SQL update statement to the cluster...
                    WARNING: An illegal reflective access operation has occurred
                    WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/usr/local/flink-1.14.5/lib/flink-dist_2.11-1.14.5.jar) to field java.lang.String.value
                    WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
                    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
                    WARNING: All illegal access operations will be denied in a future release
                    [INFO] SQL update statement has been successfully submitted to the cluster:
                    Job ID: 82b8fffbf8f4431202eaa64523968a17




                    Shutting down the session...
                    done.

                    8、查看flink同步任务

                      root@startestdb01:/usr/local/flink-1.14.5# bin/flink list
                      Waiting for response...
                      ------------------ Running/Restarting Jobs -------------------
                      27.03.2024 10:27:12 : 82b8fffbf8f4431202eaa64523968a17 : insert-into_default_catalog.mytest.mytest_mysql_tab_sink (RUNNING)
                      --------------------------------------------------------------
                      No scheduled jobs.
                      root@startestdb01:/usr/local/flink-1.14.5#

                      ##从上面第7步启动同步数据的过程,有可以看到flink的一个同步任务是82b8fffbf8f4431202eaa64523968a17。

                      9、源端和目标端验证数据同步情况

                        --源端查看表mytest_mysql_tab数据
                        mysql> select * from mytest_mysql_tab;
                        +------+--------+------+
                        | id | name | age |
                        +------+--------+------+
                        | 1106 | papa | 45 |
                        | 1103 | zaza | 30 |
                        | 1105 | kaka | 40 |
                        | 1101 | suxing | 25 |
                        | 1102 | haha | 20 |
                        | 1104 | susu | 35 |
                        +------+--------+------+
                        6 rows in set (0.03 sec)


                        --目标端查看表mytest_mysql_tab数据 
                        mysql> select * from mytest_mysql_tab;
                        +------+--------+------+
                        | id | name | age |
                        +------+--------+------+
                        | 1106 | papa | 45 |
                        | 1101 | suxing | 25 |
                        | 1102 | haha | 20 |
                        | 1104 | susu | 35 |
                        | 1103 | zaza | 30 |
                        | 1105 | kaka | 40 |
                        +------+--------+------+
                        6 rows in set (0.02 sec)


                        --源端向表表mytest_mysql_tab写入新数据
                        insert into mytest_mssql_tab values(1107,'zuzu',35),(1108,'mumu',40),(1109,'pupu',45);


                        --目标端再次查看表mytest_mysql_tab数据
                        mysql>
                        mysql> select * from mytest_mysql_tab;
                        +------+--------+------+
                        | id | name | age |
                        +------+--------+------+
                        | 1106 | papa | 45 |
                        | 1108 | mumu | 40 |
                        | 1101 | suxing | 25 |
                        | 1107 | zuzu | 35 |
                        | 1109 | pupu | 45 |
                        | 1102 | haha | 20 |
                        | 1104 | susu | 35 |
                        | 1103 | zaza | 30 |
                        | 1105 | kaka | 40 |
                        +------+--------+------+
                        9 rows in set (0.02 sec)


                        mysql>
                        mysql>
                        mysql>

                        可以看到使用flink可以正常将源端MySQL数据库的表数据实时同步到目标端starrocks数据库上面。采用的就是cdc监控MySQL端的日志,实现了实时同步。相比DataX的cdc能力,效率更高。

                        各同步工具CDC方案对照表

                        以上就是使用Flink同步MySQL的数据到Starrocks的简单举例。

                        文章转载自skylines,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论