暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

dolphinscheduler+flink+seatunnel集成详细教程

大数据技能圈 2023-07-31
114

1.软件版本

Flink:1.16.1

Dolphinscheduler:3.1.3

Seatunnel:2.3.3-SNAPSHOT(修复了2.3.2使用Flink引擎的时候无法提交任务的bug)

2.安装Flink


1)解压

    tar -zxvf flink-1.16.1-bin-scala_2.12.tgz -C /opt/software/flink
    mv flink-1.16.1 1.16.1

    2)配置文件

    flink-conf.yaml

      jobmanager.rpc.address:jobmanager_ip
      jobmanager.bind-host:0.0.0.0
      taskmanager.bind-host:0.0.0.0
      taskmanager.host:taskmanager_ip
      parallelism.default:xrest.address:0.0.0.0
      rest.port:8081
      rest.bind-address:0.0.0.0
      env.pid.dir: opt/log/flink/1.16.1/pids

      master

        master_ip

        worker

          worker1_ip
          worker2_ip
          worker3_ip

          zoo.cfg

            tickTime=2000
            initLimit=10
            syncLimit=5
            clientPort=2181
            server.1=zookeeper1_ip:2888:3888
            server.2=zookeeper2_ip:2888:3888
            server.3=zookeeper3_ip:2888:3888

            3)发送到其他节点

            scp -rp 1.16.1 用户名@target_ip:/dir1/dir2/(需要提前做服务器间免密登录

            4)启动

              ./start-cluster.sh

              5)查看页面

                flink-ui-ip:8081

                3.安装seatunnel

                目前的seatunnel最新版本是2.3.2,经测试发现在集成flink1.16.1的时候,会出现无法提交任务,在配置flink-cdc任务的时候出现无法找到jar包的错误,所以需要自己打包最新分支的代码:seatunnel-dev;(在集成flink1.13.x,1.14.x,1.15.x时没有发现这个错误)

                1)下载源代码:

                2)编译代码

                在代码最外层pom文件平级目录使用terminal执行命令1:

                  mvn clean install

                  注意:在编译过程中可能会遇到错误,执行命令2:

                    mvn spotless:apply

                    成功后,继续执行命令1.

                    编译成功后,可在seatunnle-dist/target/出现apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz文件

                    3)上传apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz文件到服务器

                    4)解压

                      tar -zxvf apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz -C /opt/seatunnel/

                      5)配置文件

                      配置seatunnel集群信息

                      hazelcast.yaml:

                        member-list:
                        - host1_ip
                        - host2_ip
                        - host3_ip

                        hazelcast-client.yaml:

                          cluster-members:
                          - host1_ip:5801
                          - host2_ip:5801
                          - host3_ip:5801

                          配置Flink信息

                          seatunnel-env.sh:

                            FLINK_HOME=${FLINK_HOME:-/opt/software/flink/1.16.1}

                            6)分发到其他节点

                              scp -rp apache-seatunnel-2.3.3-SNAPSHOT 用户名@target_host_ip:/dir1/dir2/

                              6)启动(所有节点)

                                bin/seatunnel-cluster.sh –daemon

                                4.安装dolphinscheduler

                                1)解压

                                  tar -zxvf apache-dolphinscheduler-3.1.7.tar.gz -C /opt/software/dolphinscheduler

                                  2)配置文件

                                  dolphinscheduler_env.sh:

                                    export JAVA_HOME=${JAVA_HOME:-/usr/local/java/jdk1.8.0_341}
                                    export DATABASE=${DATABASE:-mysql}
                                    export SPRING_PROFILES_ACTIVE=${DATABASE}
                                    export SPRING_DATASOURCE_URL=jdbc:mysql://mysql_host_ip:3306/dolphinscheduler_3_1_7?useUnicode=true&characterEncoding=UTF-8&useSSL=false
                                    export SPRING_DATASOURCE_USERNAME=user
                                    export SPRING_DATASOURCE_PASSWORD=password
                                    export
                                    SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
                                    export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-Asia/Shanghai}
                                    export
                                    MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
                                    export
                                    REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
                                    export
                                    REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-host1_ip:2181,host2_ip:2181,host3_ip:2181}
                                    export
                                    HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
                                    export
                                    HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
                                    export
                                    SPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}
                                    export
                                    SPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}
                                    export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
                                    export
                                    HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
                                    export
                                    FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
                                    export
                                    DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
                                    # 配置seatunnel
                                    export
                                    SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT}
                                    export
                                    CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
                                    export
                                    PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH

                                    install_env.sh:

                                      ips=${ips:-"host1_ip,host2_ip,host3_ip"}
                                      sshPort=${sshPort:-"22"}
                                      masters=${masters:-"master_ip"}
                                      workers=${workers:-"worker1_ip:default,
                                      worker2_ip:default, worker3_ip:default"}
                                      alertServer=${alertServer:-"alert_ip"}
                                      apiServers=${apiServers:-"api_ip"}
                                      installPath=${installPath:-"/opt/software/dolphinscheduler/3.1.7_install"}
                                      deployUser=${deployUser:-"hadoop"}
                                      zkRoot=${zkRoot:-"/dolphinscheduler"}

                                      3)将mysql-connector-java-8.0.25.jar放到master-server,worker-server,alert-server,api-server,tools五个模块的/libs下面

                                      4)初始化数据库(提前安装好mysql,并创建空的元数据库,元数据库在dolphinscheduler_env.sh要进行配置)

                                        执行/tools/bin/upgrade-schema.sh

                                        注意:如果出现读不到驱动的错误,可以修改master-server,worker-server,alert-server,api-server,tools下面的/conf/application.yaml文件,将driver-class-name: com.mysql.cj.jdbc.Driver 改为driver-class-name: com.mysql.jdbc.Driver

                                        5)启动zookeeper(三台服务器上都执行)

                                        进入到{FLINK_HOME}/bin/

                                          执行命令./start-zookeeper-quorum.sh

                                          6)启动dolphinscheduler

                                          进入{DOLPHINSCHEDULER_HOME}/bin/

                                          第一次运行可执行命令(会把配置文件分发到其他节点并启动)

                                            ./install.sh

                                            往后没有配置文件发生改变的情况下,可以使用./start-all.sh来进行启动

                                            7)验证

                                            访问http://api_ip:12345/dolphinscheduler/ui/默认账号密码admin/dolphinscheduler123

                                            5.测试dolphinscheduler+flink+seatunnel集成

                                            1).创建任务

                                            选择seatunnel组件

                                            启动脚本选择start-seatunnel-flink-15-connector-v2.sh;运行模式选择run模式;选项参数-m flink-master-ip:8081

                                            注意:此时在提交任务后会报org.apache.flink.client.cli.CliArgsException:

                                            Could not get job jar and dependencies from JAR file: JAR file does not exist: --run-mode错误;这是因为dolphinscheduler源码中对于RUN模式默认加了参数--run-mode run;需要单独对dolphinscheduler-task-seatunnel-xxx.jar手动进行打包;

                                            2)下载dolphinschedueler-3.1.7-release分支代码3)修改源代码

                                            将D:\geely\code\DolphinScheduler-3.1.7-release\dolphinscheduler-task-plugin

                                            \dolphinscheduler-task-seatunnel\src\main\java\org\apache\dolphinscheduler\plugin\task\seatunnel\flinkSeatunnelFlinkParameters.java中的RUN("--run-mode run ") 改为RUN(""),

                                            4)编译打包

                                            打包命令:

                                              mvn clean install -Prelease '-Dmaven.test.skip=true' '-Dcheckstyle
                                              .skip=true' '-Dmaven.javadoc.skip=true'

                                              打包结束后在D:\geely\code\DolphinScheduler-3.1.7-release\

                                              dolphinscheduler-task-plugin\dolphinscheduler-task-seatunnel\target下找到dolphinscheduler-task-seatunnel-3.1.8-SNAPSHOT.jar

                                              5)替换jar

                                              替换dolphinscheduler下面master-server,worker-server,alert-server,api-server,tools下面/libs/中的dolphinscheduler-task-seatunnel-3.1.7.jar(删除dolphinscheduler-task-seatunnel-3.1.7.jar,将dolphinscheduler-task-seatunnel-3.1.8-SNAPSHOT.jar放到目录下面)

                                              6)重启集群

                                              再次执行bin/stop-all.sh   bin/install.sh重启集群;

                                              7)再次运行任务,执行成功。

                                              6.测试dolphinscheduler+seatunnel分布式

                                              1)创建任务

                                              选择seatunnel组件

                                              选取seatunnel组件任务,启动脚本seatunnel.sh;部署方式选择cluster;

                                              2)启动任务,查看日志

                                              更多大数据相关内容,请关注大数据技能圈:

                                              文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                              评论