mysql同步到doris

下载JAR文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.15</artifactId><!--artifactId>flink-doris-connector-1.16</artifactId--><!--artifactId>flink-doris-connector-1.17</artifactId--><version>1.4.0</version></dependency>
例如,要将整个MySQL数据库mysql_db采集到Doris中(MySQL表名以tbl或test开头),只需执行以下命令(无需提前在Doris中创建表):
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \lib/flink-doris-connector-1.16-1.4.0.jar \mysql-sync-database \--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label1 \--table-conf replication_num=1



根据早期采用率的反馈,连接器还在生产环境中提供了高性能和10,000表数据库同步的系统稳定性。证明了Apache Doris与Flink CDC的结合能够高效、可靠地实现大规模数据同步。
工程师不再需要担心表的创建或表模式的维护,节省了繁琐和易出错的工作。在之前的Flink CDC中,你需要为每个表创建一个Flink作业,并在源端建立一个日志解析链接。现在,通过采集整个数据库,源数据库的资源消耗大大减少。它也是增量更新和完全更新的统一解决方案。
01
连接维度表和事实表
✦
通常的做法是将维度表放在Doris中,然后通过Flink的实时流运行连接查询。基于Flink的异步I/O特性,Flink- doris - connector 1.4.0实现了异步查询连接,因此Flink实时流不会因为查询而阻塞。此外,连接器允许将多个查询组合成一个大查询,并一次性将其发送给Doris进行处理。这提高了连接查询的效率和吞吐量。
02
Thrift SDK
✦
在连接器中引入了Thrift- service SDK,因此用户不再需要使用Thrift插件或在编译时配置Thrift环境。这使得编译过程简单得多。
03
Stream Load
✦
在数据同步期间,当没有新数据采集时,不会发出流加载请求。这避免了对集群资源的不必要消耗。
04
后端节点轮询
✦
对于数据采集,Doris调用一个前端节点来获取一个后端节点列表,然后随机选择一个启动采集请求。该后端节点将成为协调器。Flink- doris - connector 1.4.0允许用户启用轮询机制,即在每个Flink检查点使用不同的后端节点作为协调者,以避免单一后端节点长期承受过大压力。
05
支持更多数据类型
✦
除了常用的数据类型,Flink-Doris-Connector 1.4.0还支持Doris中的DecimalV3/DateV2/DateTimev2/Array/JSON。
01
从doris读取数据
✦
可以通过DataStream或FlinkSQL(有限流)从Doris中读取数据。支持谓词下推。
CREATE TABLE flink_doris_source (name STRING,age INT,score DECIMAL(5,2))WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = 'password','doris.filter.query' = 'age=18');SELECT * FROM flink_doris_source;
02
连接维度表和事实表
✦
CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()) WITH ('connector' = 'kafka',...);create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','lookup.jdbc.async' = 'true','table.identifier' = 'dim.dim_city','username' = 'root','password' = '');SELECT a.id, a.name, a.city, c.province, c.country,c.levelFROM fact_table aLEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS cON a.city = c.city
03
写入doris
✦
CREATE TABLE doris_sink (name STRING,age INT,score DECIMAL(5,2))WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.label-prefix' = 'doris_label',/json write in'sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true');
点击下方

关注我们




