暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【最佳实践】2个步骤教你从Mysql同步到Hive

SeaTunnel 2023-10-13
2274

【实现简单的逻辑】

Mysql数据同步到Hive,大致流程如下:

分为离线和实时两部分,我们先实现离线,需要以下内容:Flink,SeaTunnel,Mysql,Hive,Hadoop,Java。

离线Mysql到Hive数据同步

1)准备所需要的

2)开始

Mysql创建数据库及其内容


    -- 创建数据库
    create database seatunnel;


    -- 进入seatunnel数据库
    use seatunnel;


    -- 创建表
    create table day_test(
    dname varchar(64),
    dage int
    );


    -- 插入数据
    insert into day_test values('张三',20);
    insert into day_test values('李四',18);
    insert into day_test values('王二',29);
    insert into day_test values('麻子',22);


    数据库有数据,没什么太大问题,懒得删。



    Hive创建接收数据表


      #打开hive
      hive


      #新建数据库
      create database mydemo;


      #进入库
      use mydemo;


      #新建表
      create table hive_mysql(
      hname varchar(64),
      hage int
      );


      #查看当前表的内容
      select * from hive_mysql;

      Hive里有数据,这个没什么影响,不用管它!

      错误一

      Hive的开启顺序是:先启动Mysql,再启动Hadoop集群,再启动Hive。

      错误二

      如果出现以下错误:

      这说明你的Hive服务器没开,新开个页面,输入:

        hive --service metastore &

        不关闭这个页面就行了,放后台,这时候就行了。

        修改SeaTunnel配置文件

        这是我的SeaTunnel安装路径,你们换成自己的就行!

          #进入Seatunnel目录下的conf
          cd seatunnel/apache-seatunnel-incubating-2.3.0/conf


          #复制配置文件并改名,变为我们后面的启动文件
          cp seatunnel/apache-seatunnel-incubating-2.3.0/conf/seatunnel.streaming.conf.template seatunnel/apache-seatunnel-incubating-2.3.0/conf/example01.conf


          #打开文件修改
          vi example01.conf


          #保存并退出
          :wq

          example01.conf文件内容如下:

            env {
            execution.parallelism = 1
            }
            # 在source所属的块中配置数据源
            source {
            Jdbc {
            driver = "com.mysql.cj.jdbc.Driver"
            url = "jdbc:mysql://127.0.0.1:3306/seatunnel?serverTimezone=GMT%2b8&characterEncoding=utf-8"
            user = "root"
            password = "123456"
            query = "select * from day_test"
            }
            }
            # 在transform的块中声明转换插件
            transform {


            }
            # 在sink块中声明要输出到哪
            sink {
            Hive {
            table_name = "mydemo.hive_mysql"
            metastore_uri = "thrift://127.0.0.1:9083"
            schema {
            fields {
            hname = string
            hage= int
            }
            }
            }
            }

            使用Flink提交同步作业


              cd seatunnel/apache-seatunnel-incubating-2.3.0


              #用我们刚配置的文件去启动作业
              ./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf

              提交任务,任务完成。这时打开Flink的web页面:


              这里就可以看见运行的结果,错误或者单纯查询可以显示在这,正确了没有显示,我这里是之前的测试,这时候再去Hive里查询字段:

                select * from hive_mysql;

                内容如下

                自此离线完成,反过来也能同步,hive-->mysql

                实时CDC挖取日志同步到Hive

                Mysql CDC配置打开

                <<< <<< 注意

                Mysql CDC内容读取成功,但是报运行时错误,没法同步到Kafka后续Kafka同步到Hive已经可以实现,解决了写上来。


                Kafka挖取日志

                kafka放入Sink里接收内容,配置如下:

                  sink {
                  kafka {
                  topic = "seatunnel"
                  bootstrap.servers = "127.0.0.1:9092"
                  partition = 3
                  format = json
                  kafka.request.timeout.ms = 60000
                  semantics = EXACTLY_ONCE
                  }
                  }


                  Kafka同步数据到Hive

                  编写配置内容 kafka_hive.conf

                    #kafka要是json格式
                    env {
                    execution.parallelism = 1
                    }
                    # 在source所属的块中配置数据源
                    source {
                    Kafka {
                    result_table_name = "kafka_name"
                    schema = {
                    fields {
                    id = "int"
                    name = "string"
                    age = "int"
                    }
                    }
                    format = json
                    field_delimiter = "#"
                    topic = "mybate2"
                    bootstrap.servers = "127.0.0.1:9092"
                    kafka.max.poll.records = 500
                    kafka.client.id = 127.0.0.1
                    }
                    }
                    # 在transform的块中声明转换插件
                    transform {


                    }
                    # 在sink块中声明要输出到哪
                    sink {
                    Hive {
                    table_name = "mydemo.hive_mysql"
                    metastore_uri = "thrift://127.0.0.1:9083"
                    schema {
                    fields {
                    hid = int
                    hname = string
                    hage= int
                    }
                    }
                    }
                    }

                    设置编写启动脚本

                      vim kafka_stop.sh

                      内容如下

                        #!/bin/sh
                        #启动kafka挖取hive
                        ./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf


                        #等3秒后执行
                        sleep 3


                        #启动kafka同步数据到hive
                        ./bin/start-seatunnel-flink-connector-v2.sh --config ./config/kafka_hive.conf

                        为脚本添加权限

                          chmod +x cdc_hive.sh


                          提交Flink作业


                            sh cdc_hive.sh

                            这时打开Hive,输入Show Tables;就可以看到内容的变更! 

                            版权声明:本文为CSDN博主「underworld33」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

                            原文链接:https://blog.csdn.net/underworld33/article/details/129178821


                            新手入门

                             SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南

                            从 0 到 1 快速入门 Apache SeaTunnel 

                            初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel


                             MySQL 同步到 Hive / 从MySQL同步到StarRocks

                            通过 SeaTunnel 将数据写入 OSS-HDFS 

                            MySQL 到 Elasticsearch 实时同步解决方案


                            启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 

                             部署 Apache SeaTunnel 分布式集群

                             

                            最佳实践

                             OPPO 清风 天翼云 马蜂窝

                            孩子王 哔哩哔哩 唯品会


                            测试报告

                             性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!

                            最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!

                            比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布


                            Apache SeaTunnel





                            Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


                            仓库地址: 

                            https://github.com/apache/seatunnel


                            网址:

                            https://seatunnel.apache.org/


                            Apache SeaTunnel 下载地址:

                            https://seatunnel.apache.org/download

                             

                            衷心欢迎更多人加入!


                            我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!


                            我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!


                            提交问题和建议:

                            https://github.com/apache/seatunnel/issues


                            贡献代码:

                            https://github.com/apache/seatunnel/pulls


                            订阅社区开发邮件列表 : 

                            dev-subscribe@seatunnel.apache.org


                            开发邮件列表:

                            dev@seatunnel.apache.org


                            加入 Slack:

                            https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ


                            关注 Twitter: 

                            https://twitter.com/ASFSeaTunnel

                            文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论