1.软件版本
Flink:1.16.1
Dolphinscheduler:3.1.3
Seatunnel:2.3.3-SNAPSHOT(修复了2.3.2使用Flink引擎的时候无法提交任务的bug)
2.安装Flink
1)解压
tar -zxvf flink-1.16.1-bin-scala_2.12.tgz -C /opt/software/flinkmv flink-1.16.1 1.16.1
2)配置文件
flink-conf.yaml
jobmanager.rpc.address:jobmanager_ipjobmanager.bind-host:0.0.0.0taskmanager.bind-host:0.0.0.0taskmanager.host:taskmanager_ipparallelism.default:xrest.address:0.0.0.0rest.port:8081rest.bind-address:0.0.0.0env.pid.dir: opt/log/flink/1.16.1/pids
master
master_ip
worker
worker1_ipworker2_ipworker3_ip
zoo.cfg
tickTime=2000initLimit=10syncLimit=5clientPort=2181server.1=zookeeper1_ip:2888:3888server.2=zookeeper2_ip:2888:3888server.3=zookeeper3_ip:2888:3888
3)发送到其他节点
scp -rp 1.16.1 用户名@target_ip:/dir1/dir2/(需要提前做服务器间免密登录)
4)启动
./start-cluster.sh
5)查看页面
flink-ui-ip:8081

3.安装seatunnel
目前的seatunnel最新版本是2.3.2,经测试发现在集成flink1.16.1的时候,会出现无法提交任务,在配置flink-cdc任务的时候出现无法找到jar包的错误,所以需要自己打包最新分支的代码:seatunnel-dev;(在集成flink1.13.x,1.14.x,1.15.x时没有发现这个错误)
1)下载源代码:
2)编译代码
在代码最外层pom文件平级目录使用terminal执行命令1:
mvn clean install
注意:在编译过程中可能会遇到错误,执行命令2:
mvn spotless:apply
成功后,继续执行命令1.
编译成功后,可在seatunnle-dist/target/出现apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz文件

3)上传apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz文件到服务器
4)解压
tar -zxvf apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz -C /opt/seatunnel/
5)配置文件
配置seatunnel集群信息
hazelcast.yaml:
member-list:- host1_ip- host2_ip- host3_ip
hazelcast-client.yaml:
cluster-members:- host1_ip:5801- host2_ip:5801- host3_ip:5801
配置Flink信息
seatunnel-env.sh:
FLINK_HOME=${FLINK_HOME:-/opt/software/flink/1.16.1}
6)分发到其他节点
scp -rp apache-seatunnel-2.3.3-SNAPSHOT 用户名@target_host_ip:/dir1/dir2/
6)启动(所有节点)
bin/seatunnel-cluster.sh –daemon
4.安装dolphinscheduler
1)解压
tar -zxvf apache-dolphinscheduler-3.1.7.tar.gz -C /opt/software/dolphinscheduler
2)配置文件
dolphinscheduler_env.sh:
export JAVA_HOME=${JAVA_HOME:-/usr/local/java/jdk1.8.0_341}export DATABASE=${DATABASE:-mysql}export SPRING_PROFILES_ACTIVE=${DATABASE}export SPRING_DATASOURCE_URL=jdbc:mysql://mysql_host_ip:3306/dolphinscheduler_3_1_7?useUnicode=true&characterEncoding=UTF-8&useSSL=falseexport SPRING_DATASOURCE_USERNAME=userexport SPRING_DATASOURCE_PASSWORD=passwordexportSPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-Asia/Shanghai}exportMASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}exportREGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}exportREGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-host1_ip:2181,host2_ip:2181,host3_ip:2181}exportHADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}exportHADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}exportSPARK_HOME1=${SPARK_HOME1:-/opt/soft/spark1}exportSPARK_HOME2=${SPARK_HOME2:-/opt/soft/spark2}export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}exportHIVE_HOME=${HIVE_HOME:-/opt/soft/hive}exportFLINK_HOME=${FLINK_HOME:-/opt/soft/flink}exportDATAX_HOME=${DATAX_HOME:-/opt/soft/datax}# 配置seatunnelexportSEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/software/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT}exportCHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}exportPATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
install_env.sh:
ips=${ips:-"host1_ip,host2_ip,host3_ip"}sshPort=${sshPort:-"22"}masters=${masters:-"master_ip"}workers=${workers:-"worker1_ip:default,worker2_ip:default, worker3_ip:default"}alertServer=${alertServer:-"alert_ip"}apiServers=${apiServers:-"api_ip"}installPath=${installPath:-"/opt/software/dolphinscheduler/3.1.7_install"}deployUser=${deployUser:-"hadoop"}zkRoot=${zkRoot:-"/dolphinscheduler"}
3)将mysql-connector-java-8.0.25.jar放到master-server,worker-server,alert-server,api-server,tools五个模块的/libs下面
4)初始化数据库(提前安装好mysql,并创建空的元数据库,元数据库在dolphinscheduler_env.sh要进行配置)
执行/tools/bin/upgrade-schema.sh
注意:如果出现读不到驱动的错误,可以修改master-server,worker-server,alert-server,api-server,tools下面的/conf/application.yaml文件,将driver-class-name: com.mysql.cj.jdbc.Driver 改为driver-class-name: com.mysql.jdbc.Driver
5)启动zookeeper(三台服务器上都执行)
进入到{FLINK_HOME}/bin/
执行命令./start-zookeeper-quorum.sh
6)启动dolphinscheduler
进入{DOLPHINSCHEDULER_HOME}/bin/
第一次运行可执行命令(会把配置文件分发到其他节点并启动)
./install.sh
往后没有配置文件发生改变的情况下,可以使用./start-all.sh来进行启动
7)验证
访问http://api_ip:12345/dolphinscheduler/ui/默认账号密码admin/dolphinscheduler123

5.测试dolphinscheduler+flink+seatunnel集成
1).创建任务;

选择seatunnel组件

启动脚本选择start-seatunnel-flink-15-connector-v2.sh;运行模式选择run模式;选项参数-m flink-master-ip:8081

注意:此时在提交任务后会报org.apache.flink.client.cli.CliArgsException:
Could not get job jar and dependencies from JAR file: JAR file does not exist: --run-mode错误;这是因为dolphinscheduler源码中对于RUN模式默认加了参数--run-mode run;需要单独对dolphinscheduler-task-seatunnel-xxx.jar手动进行打包;

2)下载dolphinschedueler-3.1.7-release分支代码3)修改源代码
将D:\geely\code\DolphinScheduler-3.1.7-release\dolphinscheduler-task-plugin
\dolphinscheduler-task-seatunnel\src\main\java\org\apache\dolphinscheduler\plugin\task\seatunnel\flinkSeatunnelFlinkParameters.java中的RUN("--run-mode run ") 改为RUN(""),

4)编译打包
打包命令:
mvn clean install -Prelease '-Dmaven.test.skip=true' '-Dcheckstyle
.skip=true' '-Dmaven.javadoc.skip=true'打包结束后在D:\geely\code\DolphinScheduler-3.1.7-release\
dolphinscheduler-task-plugin\dolphinscheduler-task-seatunnel\target下找到dolphinscheduler-task-seatunnel-3.1.8-SNAPSHOT.jar
5)替换jar包
替换dolphinscheduler下面master-server,worker-server,alert-server,api-server,tools下面/libs/中的dolphinscheduler-task-seatunnel-3.1.7.jar(删除dolphinscheduler-task-seatunnel-3.1.7.jar,将dolphinscheduler-task-seatunnel-3.1.8-SNAPSHOT.jar放到目录下面)
6)重启集群
再次执行bin/stop-all.sh bin/install.sh重启集群;
7)再次运行任务,执行成功。

6.测试dolphinscheduler+seatunnel分布式
1)创建任务

选择seatunnel组件

选取seatunnel组件任务,启动脚本seatunnel.sh;部署方式选择cluster;

2)启动任务,查看日志

更多大数据相关内容,请关注大数据技能圈:




