暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何自动同步整个MySQL数据库到doris?

大数据技能圈 2023-09-19
36

mysql同步到doris

Flink-Doris-Connector 1.4.0允许用户一步将包含数千张表的整个数据库(MySQL或Oracle)采集到Apache Doris实时分析数据库中。
通过内置的Flink CDC,连接器可以直接将表模式和数据从上游源同步到Apache Doris,这意味着用户不再需要在Doris中编写DataStream程序或预先创建映射表。
启动Flink作业时,连接器会自动检查源数据库和Apache Doris之间的数据是否等价。如果数据源中包含Doris中不存在的表,连接器将自动在Doris中创建相同的表,并利用Flink的辅助输出一次性采集多个表;如果源中有模式更改,它将自动获取DDL语句并在Doris中进行相同的模式更改。

1

快速开始

下载JAR文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0

Maven:
    <dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>flink-doris-connector-1.15</artifactId>
    <!--artifactId>flink-doris-connector-1.16</artifactId-->
    <!--artifactId>flink-doris-connector-1.17</artifactId-->
    <version>1.4.0</version>
    </dependency>

    2

    如何使用

    例如,要将整个MySQL数据库mysql_db采集到Doris中(MySQL表名以tbl或test开头),只需执行以下命令(无需提前在Doris中创建表):

      <FLINK_HOME>/bin/flink run \
      -Dexecution.checkpointing.interval=10s \
      -Dparallelism.default=1 \
      -c org.apache.doris.flink.tools.cdc.CdcTools \
      lib/flink-doris-connector-1.16-1.4.0.jar \
      mysql-sync-database \
      --database test_db \
      --mysql-conf hostname=127.0.0.1 \
      --mysql-conf username=root \
      --mysql-conf password=123456 \
      --mysql-conf database-name=mysql_db \
      --including-tables "tbl|test.*" \
      --sink-conf fenodes=127.0.0.1:8030 \
      --sink-conf username=root \
      --sink-conf password=123456 \
      --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
      --sink-conf sink.label-prefix=label1 \
      --table-conf replication_num=1

      3

      性能如何

      当涉及到同步整个数据库(包含数百甚至数千张表,活动的或不活动的)时,大多数用户希望在几秒钟内完成。所以我们测试了连接器,看看它是否达到要求:
      1. 1000个MySQL表,每个表有100个字段。所有的表都是活动的(这意味着它们不断更新,每次数据写入涉及100多行)
      2. Flink任务检查点(checkpoint): 10秒
      在压力测试下,系统表现出较高的稳定性,关键指标如下:

      根据早期采用率的反馈,连接器还在生产环境中提供了高性能和10,000表数据库同步的系统稳定性。证明了Apache Doris与Flink CDC的结合能够高效、可靠地实现大规模数据同步。

      工程师不再需要担心表的创建或表模式的维护,节省了繁琐和易出错的工作。在之前的Flink CDC中,你需要为每个表创建一个Flink作业,并在源端建立一个日志解析链接。现在,通过采集整个数据库,源数据库的资源消耗大大减少。它也是增量更新和完全更新的统一解决方案。

      4

      其他功能

      01

      连接维度表和事实表

      通常的做法是将维度表放在Doris中,然后通过Flink的实时流运行连接查询。基于Flink的异步I/O特性,Flink- doris - connector 1.4.0实现了异步查询连接,因此Flink实时流不会因为查询而阻塞。此外,连接器允许将多个查询组合成一个大查询,并一次性将其发送给Doris进行处理。这提高了连接查询的效率和吞吐量。

      02

      Thrift SDK

      在连接器中引入了Thrift- service SDK,因此用户不再需要使用Thrift插件或在编译时配置Thrift环境。这使得编译过程简单得多。

      03

      Stream Load

      在数据同步期间,当没有新数据采集时,不会发出流加载请求。这避免了对集群资源的不必要消耗。

      04

      后端节点轮询

      对于数据采集,Doris调用一个前端节点来获取一个后端节点列表,然后随机选择一个启动采集请求。该后端节点将成为协调器。Flink- doris - connector 1.4.0允许用户启用轮询机制,即在每个Flink检查点使用不同的后端节点作为协调者,以避免单一后端节点长期承受过大压力。

      05

      支持更多数据类型

      除了常用的数据类型,Flink-Doris-Connector 1.4.0还支持Doris中的DecimalV3/DateV2/DateTimev2/Array/JSON。

      5

      案例展示

      01

      从doris读取数据

      可以通过DataStream或FlinkSQL(有限流)从Doris中读取数据。支持谓词下推。

        CREATE TABLE flink_doris_source (
        name STRING,
        age INT,
        score DECIMAL(5,2)
        )
        WITH (
        'connector' = 'doris',
        'fenodes' = '127.0.0.1:8030',
        'table.identifier' = 'database.table',
        'username' = 'root',
        'password' = 'password',
        'doris.filter.query' = 'age=18'
        );


        SELECT * FROM flink_doris_source;

        02

        连接维度表和事实表

          CREATE TABLE fact_table (
          `id` BIGINT,
          `name` STRING,
          `city` STRING,
          `process_time` as proctime()
          ) WITH (
          'connector' = 'kafka',
          ...
          );


          create table dim_city(
          `city` STRING,
          `level` INT ,
          `province` STRING,
          `country` STRING
          ) WITH (
          'connector' = 'doris',
          'fenodes' = '127.0.0.1:8030',
          'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
          'lookup.jdbc.async' = 'true',
          'table.identifier' = 'dim.dim_city',
          'username' = 'root',
          'password' = ''
          );


          SELECT a.id, a.name, a.city, c.province, c.country,c.level
          FROM fact_table a
          LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
          ON a.city = c.city

          03

          写入doris

            CREATE TABLE doris_sink (
            name STRING,
            age INT,
            score DECIMAL(5,2)
            )
            WITH (
            'connector' = 'doris',
            'fenodes' = '127.0.0.1:8030',
            'table.identifier' = 'database.table',
            'username' = 'root',
            'password' = '',
            'sink.label-prefix' = 'doris_label',
            /json write in
            'sink.properties.format' = 'json',
            'sink.properties.read_json_by_line' = 'true'
            );

            点击下方

            关注我们


            文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论