暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

数据平台:Apache Flink数据库实时同步

码奋 2025-01-29
139
 新年快乐,祝您发财,被爱,好运常在!

1. Flink 在数据库实时同步中的优势


(1)原生支持变更数据捕获(CDC)
Flink 社区提供了内置的 CDC 连接器(如 MySQL CDC、PostgreSQL CDC),可直接捕获数据库的增量变更(如 Binlog、WAL),无需额外中间件。
支持 Debezium 集成,通过 Kafka 等消息队列传递变更事件,实现解耦和缓冲。
(2)低延迟与高吞吐
Flink 的流处理引擎可实现毫秒级延迟,适合实时同步需求。
分布式架构支持横向扩展,轻松应对高吞吐场景(如每秒数万条数据变更)。
(3)Exactly-Once 语义保证
通过 Checkpoint 机制和两阶段提交(2PC),Flink 能确保数据从源端到目标端的端到端一致性,避免重复或丢失数据。
支持幂等写入到目标数据库(如 MySQL、Kafka、Elasticsearch),保障最终一致性。
(4)灵活的流处理能力
在数据同步过程中可嵌入 ETL 逻辑(如过滤、转换、聚合),例如实时清洗数据或关联多表变更。
支持复杂事件处理(CEP)和状态计算,适合需要实时计算的场景(如数据一致性校验)。
(5)丰富的连接器生态
支持主流数据库(MySQL、PostgreSQL、Oracle)和消息队列(Kafka、Pulsar)作为源或目标。
可扩展 API 允许自定义连接器,适配小众数据库。

2. 典型应用场景


(1)实时数仓与数据湖同步
将 OLTP 数据库的变更实时同步到 OLAP 系统(如 ClickHouse、Hive、Iceberg),支持实时分析。
(2)微服务数据共享
跨服务同步关键数据(如订单状态、用户信息),避免直接耦合数据库。
(3)异地多活与灾备
实时复制数据到异地数据中心,保障业务连续性。

3. 实现流程


3.1 配置源数据库的 CDC

以 MySQL 为例

  1. 启用 binlog:

    # my.cnf
    server-id=1
    log-bin=mysql-bin
    binlog_format=ROW
    binlog_row_image=FULL

    创建 Debezium 用户并授权:

      CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
      GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user';

      3.2 部署 Debezium 并捕获变更到 Kafka

      1. 配置 Debezium MySQL Connector

        {
          "name""mysql-connector",
          "config": {
            "connector.class""io.debezium.connector.mysql.MySqlConnector",
            "database.hostname""localhost",
            "database.port""3306",
            "database.user""flink_user",
            "database.password""password",
            "database.server.id""184054",
            "database.server.name""dbserver1",
            "database.include.list""mydb",
            "table.include.list""mydb.users",
            "database.history.kafka.bootstrap.servers""kafka:9092",
            "database.history.kafka.topic""schema-changes.mydb"
          }
        }

        Debezium 将 binlog 转换为事件写入 Kafka Topic(如 dbserver1.mydb.users)。

        3.3 使用 Flink 处理变更数据

        Flink CDC Connector(推荐)
        直接通过 Flink CDC 读取数据库变更,无需 Kafka:
          -- Flink SQL
          CREATE TABLE users (
            id INT,
            name STRING,
            email STRING,
            PRIMARY KEY (id) NOT ENFORCED
          ) WITH (
            'connector' = 'mysql-cdc',
            'hostname' = 'localhost',
            'port' = '3306',
            'username' = 'flink_user',
            'password' = 'password',
            'database-name' = 'mydb',
            'table-name' = 'users'
          );


          CREATE TABLE target_db (
            id INT,
            name STRING,
            email STRING,
            PRIMARY KEY (id) NOT ENFORCED
          ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://target:3306/mydb',
            'table-name' = 'users',
            'username' = 'target_user',
            'password' = 'target_password'
          );


          -- 将数据同步到目标表
          INSERT INTO target_db SELECT * FROM users;
          PyFlink SQL 示例
            from pyflink.table import EnvironmentSettings, TableEnvironment


            # 初始化 Flink Table 环境
            env_settings = EnvironmentSettings.in_streaming_mode()
            t_env = TableEnvironment.create(env_settings)


            # 定义 MySQL CDC 源表
            t_env.execute_sql("""
            CREATE TABLE source_users (
                id INT,
                name STRING,
                email STRING,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'mysql-cdc',
                'hostname' = 'localhost',
                'port' = '3306',
                'username' = 'flink_user',
                'password' = 'password',
                'database-name' = 'mydb',
                'table-name' = 'users'
            )
            """)


            # 定义目标 PostgreSQL 表
            t_env.execute_sql("""
            CREATE TABLE target_users (
                id INT,
                name STRING,
                email STRING,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:postgresql://localhost:5432/target_db',
                'table-name' = 'users',
                'username' = 'user',
                'password' = 'password'
            )
            """)


            # 执行同步任务
            t_env.execute_sql("INSERT INTO target_users SELECT * FROM source_users")



            往期推荐
            01

            数据平台数据传输加密:对称、非对称与混合加密等的运用

            02

            数据中台的数据库实时同步:作用、性能消耗与优化策略

            03

            Zookeeper的介绍和集群安装


            文章转载自码奋,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论