【实现简单的逻辑】
Mysql数据同步到Hive,大致流程如下:

分为离线和实时两部分,我们先实现离线,需要以下内容:Flink,SeaTunnel,Mysql,Hive,Hadoop,Java。
离线Mysql到Hive数据同步
1)准备所需要的
2)开始
Mysql创建数据库及其内容
-- 创建数据库create database seatunnel;-- 进入seatunnel数据库use seatunnel;-- 创建表create table day_test(dname varchar(64),dage int);-- 插入数据insert into day_test values('张三',20);insert into day_test values('李四',18);insert into day_test values('王二',29);insert into day_test values('麻子',22);
数据库有数据,没什么太大问题,懒得删。

Hive创建接收数据表
#打开hivehive#新建数据库create database mydemo;#进入库use mydemo;#新建表create table hive_mysql(hname varchar(64),hage int);#查看当前表的内容select * from hive_mysql;
Hive里有数据,这个没什么影响,不用管它!

错误一
Hive的开启顺序是:先启动Mysql,再启动Hadoop集群,再启动Hive。
错误二
如果出现以下错误:

这说明你的Hive服务器没开,新开个页面,输入:
hive --service metastore &
不关闭这个页面就行了,放后台,这时候就行了。

修改SeaTunnel配置文件
这是我的SeaTunnel安装路径,你们换成自己的就行!
#进入Seatunnel目录下的confcd seatunnel/apache-seatunnel-incubating-2.3.0/conf#复制配置文件并改名,变为我们后面的启动文件cp seatunnel/apache-seatunnel-incubating-2.3.0/conf/seatunnel.streaming.conf.template seatunnel/apache-seatunnel-incubating-2.3.0/conf/example01.conf#打开文件修改vi example01.conf#保存并退出:wq
example01.conf文件内容如下:
env {execution.parallelism = 1}# 在source所属的块中配置数据源source {Jdbc {driver = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://127.0.0.1:3306/seatunnel?serverTimezone=GMT%2b8&characterEncoding=utf-8"user = "root"password = "123456"query = "select * from day_test"}}# 在transform的块中声明转换插件transform {}# 在sink块中声明要输出到哪sink {Hive {table_name = "mydemo.hive_mysql"metastore_uri = "thrift://127.0.0.1:9083"schema {fields {hname = stringhage= int}}}}
使用Flink提交同步作业
cd seatunnel/apache-seatunnel-incubating-2.3.0#用我们刚配置的文件去启动作业./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf

提交任务,任务完成。这时打开Flink的web页面:



这里就可以看见运行的结果,错误或者单纯查询可以显示在这,正确了没有显示,我这里是之前的测试,这时候再去Hive里查询字段:
select * from hive_mysql;
内容如下

自此离线完成,反过来也能同步,hive-->mysql
实时CDC挖取日志同步到Hive
Mysql CDC配置打开
<<< <<< 注意
Mysql CDC内容读取成功,但是报运行时错误,没法同步到Kafka,后续Kafka同步到Hive已经可以实现,解决了写上来。
Kafka挖取日志
kafka放入Sink里接收内容,配置如下:
sink {kafka {topic = "seatunnel"bootstrap.servers = "127.0.0.1:9092"partition = 3format = jsonkafka.request.timeout.ms = 60000semantics = EXACTLY_ONCE}}
Kafka同步数据到Hive
编写配置内容 kafka_hive.conf
#kafka要是json格式env {execution.parallelism = 1}# 在source所属的块中配置数据源source {Kafka {result_table_name = "kafka_name"schema = {fields {id = "int"name = "string"age = "int"}}format = jsonfield_delimiter = "#"topic = "mybate2"bootstrap.servers = "127.0.0.1:9092"kafka.max.poll.records = 500kafka.client.id = 127.0.0.1}}# 在transform的块中声明转换插件transform {}# 在sink块中声明要输出到哪sink {Hive {table_name = "mydemo.hive_mysql"metastore_uri = "thrift://127.0.0.1:9083"schema {fields {hid = inthname = stringhage= int}}}}
设置编写启动脚本
vim kafka_stop.sh
内容如下
#!/bin/sh#启动kafka挖取hive./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf#等3秒后执行sleep 3#启动kafka同步数据到hive./bin/start-seatunnel-flink-connector-v2.sh --config ./config/kafka_hive.conf
为脚本添加权限
chmod +x cdc_hive.sh
提交Flink作业
sh cdc_hive.sh
这时打开Hive,输入Show Tables;就可以看到内容的变更!
版权声明:本文为CSDN博主「underworld33」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/underworld33/article/details/129178821

新手入门
SeaTunnel 让数据集成变得 So easy! / 3 分钟入门指南
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel
MySQL 同步到 Hive / 从MySQL同步到StarRocks
MySQL 到 Elasticsearch 实时同步解决方案
启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta

最佳实践

测试报告
性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
Apache SeaTunnel
Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台
仓库地址:
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 :
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
关注 Twitter:
https://twitter.com/ASFSeaTunnel




