暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 Apache SeaTunnel 在 MySQL 和 HTTP 之间的数据同步示例

SeaTunnel 2024-10-10
1025

随着现代企业数据量的不断增长,跨系统、跨平台的数据同步需求变得愈发重要。

在实际的业务场景中,开发者常常需要将数据从 MySQL 同步到其他系统,或者从不同的数据源同步回 MySQL。Apache SeaTunnel 作为一款高效的分布式数据集成平台,支持批处理和流处理,能够灵活地完成这些任务。

本文将详细介绍如何使用 Apache SeaTunnel 实现以下几种常见的数据同步场景:

  • MySQL 同步到 HTTP 接口
  • MySQL 同步到 MySQL
  • HTTP 接口同步到 MySQL
  • MySQL-CDC 同步到 HTTP 接口

我们将逐一展示这些同步场景的配置方式,并提供清晰的代码示例,帮助开发者快速掌握 SeaTunnel 在不同场景下的应用。

官方文档参考

SeaTunnel JDBC Source Connector

前置准备

在开始之前,请确保已经下载了对应版本的 MySQL JDBC 驱动 mysql-connector-java-xxx.jar
,并将其放置在 SeaTunnel 的安装目录下的 lib
文件夹中。

可以从以下链接获取:https://mvnrepository.com/artifact/mysql/mysql-connector-java

对于使用 Spark 或 Flink 的 SeaTunnel 任务,也需要将该 JAR 包复制到相应的目录下:

  • Spark:  $SPARK_HOME/jars/
  • Flink:  $FLINK_HOME/lib/

接下来,我们将逐一展示四种数据同步的配置和代码示例。

MySQL 同步到 HTTP 接口

在此场景中,我们将 MySQL 数据表中的信息同步到指定的 HTTP 接口。

这里假设我们从 user_info
表中查询数据并通过 HTTP POST 请求将其发送到目标 API。

env {
  execution.parallelism = 2
  job.mode = "BATCH"  # MySQL 作为数据源,只支持批量同步
}

source {
   jdbc {
     url =  "jdbc:mysql://172.27.10.22:6033/test"
     driver = "com.mysql.cj.jdbc.Driver"
     connection_check_timeout_sec = 100
     user = "root"
     password = "root"
     query = "SELECT * FROM user_info ORDER BY create_time LIMIT 1"
     result_table_name = "user_info_out"
  }
}

transform {
    Sql {
      source_table_name = "user_info_out"
      result_table_name = "user_info_sink"
      query = "select info, user_name, age from user_info_out"
    }
}

sink {
  Console {
    source_table_name = "user_info_sink"
  }

  http {
    source_table_name = "user_info_sink"
    url = "https://test.test.com:8080/api/user/test"
    method = "POST"
    headers = {Accept="application/json", Content-Type="application/json;charset=utf-8"}
  }
}

MySQL 同步到 MySQL

在此示例中,我们将从一个 MySQL 数据库中提取数据,并将其同步到另一个 MySQL 数据库。此场景适用于多个数据库实例之间的数据迁移或备份。

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}

source {
    Jdbc {
        url =  "jdbc:mysql://172.27.10.22:6033/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 10
        user = "root"
        password = "root"
        query = "SELECT `name`,`score` FROM `user`"
        result_table_name = "user_info"
    }
}

sink {
  Jdbc {
        source_table_name = "user_info"
        url =  "jdbc:mysql://192.27.10.22:16033/temp_user"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "root"
        query = "INSERT INTO `student`(`name`, `score`) VALUES(?, ?)"
  }
}

HTTP 接口同步到 MySQL

本示例展示了如何将 HTTP 接口中的数据同步到 MySQL 数据库。

这在从第三方 API 获取数据并将其存储到本地数据库的场景中非常实用。

env {
  execution.parallelism = 2
  job.mode = "STREAMING"  # HTTP 作为数据源,支持批量和流式模式
  checkpoint.interval = 10000  # 执行间隔(毫秒)
}

source {
  Http {
    url = "https://test.test.com:8080/api/test"
    method = "GET"
    format = "json"
    headers = {Authorization="Bearer example-token", language="zh"}
    params = {userId="fa438165b2c84d8dbe9175d152718437"}
    content_field = "$.content.*"
    schema = {
      fields {
        userId = string
        age = int
        phone = string
        name = string
      }
    }
    result_table_name = "user_info"
  }
}

transform {
    Sql {
      source_table_name = "user_info"
      result_table_name = "user_info_out"
      query = "SELECT name as userName, userId, age, phone FROM user_info"
    }
}

sink {
  Jdbc {
     url = "jdbc:mysql://172.27.10.22:26033/test"
     driver = "com.mysql.cj.jdbc.Driver"
     connection_check_timeout_sec = 100
     user = "root"
     password = "root"
     source_table_name = "user_info_out"
     query = "INSERT INTO `user_bak`(`userName`, `userId`, `age`, `phone`) VALUES (?, ?, ?, ?)"
  }
}

MySQL-CDC 同步到 HTTP 接口

MySQL-CDC(Change Data Capture)允许实时捕获数据库中的数据变化。

在此示例中,我们将 MySQL 数据库中的变化通过 CDC 机制捕获,并将其同步到 HTTP 接口。

env {
  execution.parallelism = 2
  job.mode = "STREAMING"  # MySQL-CDC 支持批量和流式模式
  checkpoint.interval = 10000  # 执行间隔(毫秒)
}

source {
    MySQL-CDC {
      catalog = {
        factory = MySQL
      }
      base-url = "jdbc:mysql://${mysql_ip_port}/test?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
      username = ${mysql_username}  # 使用变量替换
      password = ${mysql_pass}  # 使用变量替换
      table-names = ["test.user"]
      startup.mode = "initial"
      result_table_name = "user_info_out"
      table-names-config = [
        {
          table = "test.user"
          primaryKeys = ["user_id"]
        }
      ]
    }
}

transform {
    FilterRowKind {
      source_table_name = "user_info_out"
      result_table_name = "user_info_sink"
      include_kinds = ["UPDATE_AFTER""INSERT"]
    }
}

sink {
  http {
    source_table_name = "user_info_sink"
    url = "https://test.test.com:28080/api/user/test"
    method = "POST"
    headers = {Accept="application/json", Content-Type="application/json;charset=utf-8"}
  }
}

总结

通过 Apache SeaTunnel 的强大数据集成能力,开发者可以轻松实现多种数据源之间的同步操作。无论是数据库与 API 之间的数据传输,还是跨数据库的数据迁移,SeaTunnel 都为开发者提供了灵活、高效的解决方案。

希望通过本文的示例,您能够快速上手并在实际项目中 应用 SeaTunnel 进行复杂的数据同步任务。

SeaTunnel 提供的流处理和批处理模式极大地满足了多种场景下的数据处理需求,使得跨平台、跨数据源的数据集成变得更加简单、高效。


同步Demo

 MySQL→Doris
MySQLCDC
MySQL→Hive
 HTTP → Doris 

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
Apache SeaTunnel Web部署指南
基于Apache SeaTunnel构建CDC数据同步管道
【用户投稿】Apache SeaTunnel 2.3.3+Web 1.0.0版本安装部署
【安装部署】Apache SeaTunnel 和 Web快速安装详解
【保姆级教程】使用SeaTunnel同步Kafka的数据到ClickHouse
【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会
众安保险 兆原数通 亚信科技
映客 翼康济世 信也科技
华润置地

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

源码解析


Apache SeaTunnel Zeta引擎源码解析(一) Server端的初始化
Apache SeaTunnel Zeta引擎源码解析(二) Client端的任务提交流程
Apache SeaTunnel Zeta引擎源码解析(三) Server端接收任务的执行流程
从启动到关闭 | SeaTunnel2.1.1源码解析
SeaTunnel 2.1.2 封装 Flink 连接数据库的源码解析
那些年,我们在Apache SeaTunnel 2.1.0部署中踩过的坑【含源码分析】


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel


文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论