这里简单说一下使用Flink同步MySQL的数据到Starrocks数据库上,可以用以下的图可以了解大致的步骤和组件。

首先讲一下这个同步工具同步数据涉及到的组件,包括starrocks的SMT同步工具,还有flink工具,再加上连接源端和目标端的flink-cdc-connector,都是jar文件,分别是flink-sql-connector-mysql-cdc-3.0.1.jar和flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar。对于连接源端的connector,版本要求不高,但是对于连接目标端的starrocks的connector,则有相对严格的要求。

flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 会包含三个版本号:
第一个版本号 x.x.x 为 flink-connector-starrocks 的版本号。
第二个版本号 y.yy 为其支持的 Flink 版本号。
第三个版本号 z.zz 为 Flink 支持的 Scala 版本号。如果 Flink 为 1.14.x 以及之前版本,则需要下载带有 Scala 版本号的 flink-connector-starrocks。
目标端MySQL数据库准备测试表和数据:
--建表语句create table mytest_mssql_tab (id int primary key,name varchar(10),age int);--写入初始数据insert into mytest_mssql_tab values(1101,'suxing',25),(1102,'haha',20),(1103,'zaza',30);insert into mytest_mssql_tab values(1104,'susu',35),(1105,'kaka',40),(1106,'papa',45);--同步后继续写入新数据insert into mytest_mssql_tab values(1107,'zuzu',35),(1108,'mumu',40),(1109,'pupu',45);insert into mytest_mssql_tab values(1110,'coco',30);
接着就大概讲一下配置过程:
1、安装flink
比如我采用的是flink-1.14.5-bin-scala_2.11.tar.gz这个版本的安装包。解压之后,按照默认配置直接启动flink就可以了。
/usr/local/flink-1.14.5/bin/start-cluster.sh
Starting cluster.Starting standalonesession daemon on host startestdb01.Starting taskexecutor daemon on host startestdb01.root@startestdb01:/usr/local/flink-1.14.5#root@startestdb01:/usr/local/flink-1.14.5#root@startestdb01:/usr/local/flink-1.14.5# ps -ef |grep flinkroot 48808 1 52 22:39 pts/0 00:00:06 usr/local/jdk-11.0.1/bin/java -Xmx234881024 -Xms234881024 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/usr/local/flink-1.14.5/log/flink-root-standalonesession-1-startestdb01.log -Dlog4j.configuration=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlogback.configurationFile=file:/usr/local/flink-1.14.5/conf/logback.xml -classpath usr/local/flink-1.14.5/lib/flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar:/usr/local/flink-1.14.5/lib/flink-csv-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-json-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-shaded-zookeeper-3.4.14.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-mysql-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-sqlserver-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-table_2.11-1.14.5.jar:/usr/local/flink-1.14.5/lib/log4j-1.2-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-core-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar:/usr/local/flink-1.14.5/lib/flink-dist_2.11-1.14.5.jar::: org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir usr/local/flink-1.14.5/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=234881024b -D jobmanager.memory.jvm-overhead.max=201326592broot 49078 1 59 22:39 pts/0 00:00:05 usr/local/jdk-11.0.1/bin/java -XX:+UseG1GC -Xmx145961776 -Xms145961776 -XX:MaxDirectMemorySize=201326592 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/usr/local/flink-1.14.5/log/flink-root-taskexecutor-0-startestdb01.log -Dlog4j.configuration=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/local/flink-1.14.5/conf/log4j.properties -Dlogback.configurationFile=file:/usr/local/flink-1.14.5/conf/logback.xml -classpath usr/local/flink-1.14.5/lib/flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar:/usr/local/flink-1.14.5/lib/flink-csv-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-json-1.14.5.jar:/usr/local/flink-1.14.5/lib/flink-shaded-zookeeper-3.4.14.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-mysql-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-sql-connector-sqlserver-cdc-3.0.1.jar:/usr/local/flink-1.14.5/lib/flink-table_2.11-1.14.5.jar:/usr/local/flink-1.14.5/lib/log4j-1.2-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-api-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-core-2.17.1.jar:/usr/local/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar:/usr/local/flink-1.14.5/lib/flink-dist_2.11-1.14.5.jar::: org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir usr/local/flink-1.14.5/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=231525584b -D taskmanager.memory.task.heap.size=11744048b -D taskmanager.numberOfTaskSlots=1 -D taskmanager.memory.jvm-overhead.max=201326592broot 49193 48439 0 22:39 pts/0 00:00:00 grep --color=auto flinkroot@startestdb01:/usr/local/flink-1.14.5#
2、将两端flink-cdc-connector的jar包放到flink上面
mv flink-connector-starrocks-1.2.3_flink-1.14_2.11.jar /usr/local/flink-1.14.5/libmv flink-sql-connector-mysql-cdc-3.0.1 usr/local/flink-1.14.5/lib##需要重启flink/usr/local/flink-1.14.5/bin/stop-cluster.sh/usr/local/flink-1.14.5/bin/start-cluster.sh
3、解压starrrocks的SMT到/usr/local/flink-1.14.5目录下
tar -xvf smt.tarmv smt usr/local/flink-1.14.5
4、编辑smt配置文件
cd usr/local/flink-1.14.5/smt/confvi config_prod.conf[db]host = 172.17.7.11port = 3306user = myreplpassword = password# currently available types: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`type = mysql# # only takes effect on `type == hive`.# # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap# authentication = kerberos[other]# number of backends in StarRocksbe_num = 2# `decimal_v3` is supported since StarRocks-1.8.1use_decimal_v3 = true# directory to save the converted DDL SQLoutput_dir = ./result# !!!`database` `table` `schema` are case sensitive in `oracle`!!![table-rule.1]# pattern to match databases for setting properties# !!! database should be a `whole instance(or pdb) name` but not a regex when it comes with an `oracle db` !!!database = mytest# pattern to match tables for setting propertiestable = mytest_mysql_tab# `schema` only takes effect on `postgresql` and `oracle` and `sqlserver`schema = ^.*$############################################### starrocks table configurations############################################# # set a column as the partition_key# partition_key = p_key# # override the auto-generated partitions# partitions = START ("2021-01-02") END ("2021-01-04") EVERY (INTERVAL 1 day)# # only take effect on tables without primary keys or unique indexes# duplicate_keys=k1,k2# # override the auto-generated distributed keys# distributed_by=k1,k2# # override the auto-generated distributed buckets# bucket_num=32# # properties.xxxxx: properties used to create tables# properties.in_memory = false############################################### flink sink configurations### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated############################################flink.starrocks.jdbc-url=jdbc:mysql://172.17.199.36:9030flink.starrocks.load-url=172.17.199.36:8030flink.starrocks.username=rootflink.starrocks.password=passwordflink.starrocks.sink.max-retries=10flink.starrocks.sink.buffer-flush.interval-ms=15000flink.starrocks.sink.properties.format=jsonflink.starrocks.sink.properties.strip_outer_array=true# # used to set the server-id for mysql-cdc jobs instead of using a random server-id# flink.cdc.server-id = 5000############################################### flink-cdc configuration for `tidb`############################################# # Only takes effect on TiDB before v4.0.0.# # TiKV cluster's PD address.# flink.cdc.pd-addresses = 127.0.0.1:2379############################################### flink-cdc plugin configuration for `postgresql`############################################# # for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming# # refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html# # and https://debezium.io/documentation/reference/postgres-plugins.html# flink.cdc.decoding.plugin.name = decoderbufs
5、生成source table与sink table 的执行SQL
cd usr/local/flink-1.14.5/smt./starrocks-migrate-toolroot@startestdb01:/usr/local/flink-1.14.5/smt# ls -lrt result_mysqltotal 24-rw-r--r-- 1 root root 412 Mar 28 15:09 starrocks-external-create.all.sql-rw-r--r-- 1 root root 412 Mar 28 15:09 starrocks-external-create.1.sql-rw-r--r-- 1 root root 315 Mar 28 15:09 starrocks-create.1.sql-rw-r--r-- 1 root root 1152 Mar 28 15:09 flink-create.all.sql-rw-r--r-- 1 root root 1152 Mar 28 15:09 flink-create.1.sql-rw-r--r-- 1 root root 305 Mar 28 15:11 starrocks-create.all.sql
6、编辑starrocks-create.all.sql并导入starrocks数据库
--修改成适用于starrocks的建库和建表语句cat >starrocks-create.all.sqlCREATE DATABASE IF NOT EXISTS `mytest`;CREATE TABLE IF NOT EXISTS `mytest`.`mytest_mysql_tab` (`id` INT NOT NULL COMMENT "",`name` STRING NULL COMMENT "",`age` INT NULL COMMENT "") ENGINE=olapPRIMARY KEY(`id`)COMMENT ""DISTRIBUTED BY HASH(`id`)PROPERTIES ("replication_num" = "2");mysql -h127.0.0.1 -P9030 -uroot -p < starrocks-create.all.sql
7、编辑flink-create.all.sql并启动同步
flink-create.all.sql里面包含了source table 和sink table的表结构信息和源端与目标端的连接信息等,如下所示。
--调整source table与sink table 语句cat flink-create.all.sqlCREATE DATABASE IF NOT EXISTS `default_catalog`.`mytest`;CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_src` (`id` INT NOT NULL,`name` STRING NULL,`age` INT NULL,PRIMARY KEY(`id`)NOT ENFORCED) with ('database-name' = 'mytest','table-name' = 'mytest_mysql_tab','connector' = 'mysql-cdc','hostname' = '172.17.7.11','port' = '3306','username' = 'myrepl','password' = 'password');CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_sink` (`id` INT NOT NULL,`name` STRING NULL,`age` INT NULL,PRIMARY KEY(`id`)NOT ENFORCED) with ('sink.properties.format' = 'json','load-url' = '172.17.199.36:8030','sink.properties.strip_outer_array' = 'true','connector' = 'starrocks','table-name' = 'mytest_mysql_tab','sink.buffer-flush.interval-ms' = '15000','password' = 'password','sink.max-retries' = '10','jdbc-url' = 'jdbc:mysql://172.17.199.36:9030','database-name' = 'mytest','username' = 'root');--启动同步root@startestdb01:/usr/local/flink-1.14.5# ./bin/sql-client.sh -f /usr/local/flink-1.14.5/smt/result/flink-create.all.sql[INFO] Executing SQL from file.Flink SQL> CREATE DATABASE IF NOT EXISTS `default_catalog`.`mytest`;[INFO] Execute statement succeed.Flink SQL>CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_src` (`id` INT NOT NULL,`name` STRING NULL,`age` INT NULL,PRIMARY KEY(`id`)NOT ENFORCED) with ('password' = 'password','database-name' = 'mytest','table-name' = 'mytest_mysql_tab','connector' = 'mysql-cdc','hostname' = '172.17.7.11','port' = '3306','username' = 'myrepl');[INFO] Execute statement succeed.Flink SQL>CREATE TABLE IF NOT EXISTS `default_catalog`.`mytest`.`mytest_mysql_tab_sink` (`id` INT NOT NULL,`name` STRING NULL,`age` INT NULL,PRIMARY KEY(`id`)NOT ENFORCED) with ('jdbc-url' = 'jdbc:mysql://172.17.199.36:9030','username' = 'root','connector' = 'starrocks','table-name' = 'mytest_mysql_tab','sink.buffer-flush.interval-ms' = '15000','sink.properties.strip_outer_array' = 'true','load-url' = '172.17.199.36:8030','password' = 'password','sink.properties.format' = 'json','database-name' = 'mytest','sink.max-retries' = '10');[INFO] Execute statement succeed.Flink SQL>INSERT INTO `default_catalog`.`mytest`.`mytest_mysql_tab_sink` SELECT * FROM `default_catalog`.`mytest`.`mytest_mysql_tab_src`;[INFO] Submitting SQL update statement to the cluster...WARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/usr/local/flink-1.14.5/lib/flink-dist_2.11-1.14.5.jar) to field java.lang.String.valueWARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleanerWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: 82b8fffbf8f4431202eaa64523968a17Shutting down the session...done.
8、查看flink同步任务
root@startestdb01:/usr/local/flink-1.14.5# bin/flink listWaiting for response...------------------ Running/Restarting Jobs -------------------27.03.2024 10:27:12 : 82b8fffbf8f4431202eaa64523968a17 : insert-into_default_catalog.mytest.mytest_mysql_tab_sink (RUNNING)--------------------------------------------------------------No scheduled jobs.root@startestdb01:/usr/local/flink-1.14.5#
##从上面第7步启动同步数据的过程,有可以看到flink的一个同步任务是82b8fffbf8f4431202eaa64523968a17。
9、源端和目标端验证数据同步情况
--源端查看表mytest_mysql_tab数据mysql> select * from mytest_mysql_tab;+------+--------+------+| id | name | age |+------+--------+------+| 1106 | papa | 45 || 1103 | zaza | 30 || 1105 | kaka | 40 || 1101 | suxing | 25 || 1102 | haha | 20 || 1104 | susu | 35 |+------+--------+------+6 rows in set (0.03 sec)--目标端查看表mytest_mysql_tab数据mysql> select * from mytest_mysql_tab;+------+--------+------+| id | name | age |+------+--------+------+| 1106 | papa | 45 || 1101 | suxing | 25 || 1102 | haha | 20 || 1104 | susu | 35 || 1103 | zaza | 30 || 1105 | kaka | 40 |+------+--------+------+6 rows in set (0.02 sec)--源端向表表mytest_mysql_tab写入新数据insert into mytest_mssql_tab values(1107,'zuzu',35),(1108,'mumu',40),(1109,'pupu',45);--目标端再次查看表mytest_mysql_tab数据mysql>mysql> select * from mytest_mysql_tab;+------+--------+------+| id | name | age |+------+--------+------+| 1106 | papa | 45 || 1108 | mumu | 40 || 1101 | suxing | 25 || 1107 | zuzu | 35 || 1109 | pupu | 45 || 1102 | haha | 20 || 1104 | susu | 35 || 1103 | zaza | 30 || 1105 | kaka | 40 |+------+--------+------+9 rows in set (0.02 sec)mysql>mysql>mysql>
可以看到使用flink可以正常将源端MySQL数据库的表数据实时同步到目标端starrocks数据库上面。采用的就是cdc监控MySQL端的日志,实现了实时同步。相比DataX的cdc能力,效率更高。
各同步工具CDC方案对照表

以上就是使用Flink同步MySQL的数据到Starrocks的简单举例。




