暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【实战】一招搞定Shell调度!DolphinScheduler+ProcessBuilder超详细教程

海豚调度 2025-04-28
114

点击蓝字



关注我们

本文将介绍在DolphinScheduler中使用ProcessBuilder
执行Shell命令的方法。默认通过BashShellInterceptorBuilder
封装Shell脚本并生成执行命令,支持普通模式和sudo模式运行。同时,结合Spring Boot应用示例,展示了如何配置工作目录、合并错误流、监控执行状态,并输出日志信息,从而实现对Shell任务的高效管理和调度。


1

ProcessBuilder 

DolphinScheduler中的使用

1.1、命令的封装

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory

public class ShellInterceptorBuilderFactory {

    private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");

    @SuppressWarnings("unchecked")
    public static IShellInterceptorBuilder newBuilder() {
        TODO 默认的走的是这个逻辑
        if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
            return new BashShellInterceptorBuilder();
        }
        if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
            return new ShShellInterceptorBuilder();
        }
        if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
            return new CmdShellInterceptorBuilder();
        }
        throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
    }
}

默认走的是 BashShellInterceptorBuilder

org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder

public class BashShellInterceptorBuilder
        extends
            BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {

    @Override
    public BashShellInterceptorBuilder newBuilder() {
        return new BashShellInterceptorBuilder();
    }

    @Override
    public BashShellInterceptor build() throws FileOperateException, IOException {
        TODO 这里是生成shell脚本的核心点
        generateShellScript();
        List<String> bootstrapCommand = generateBootstrapCommand();
        TODO 实例化BashShellInterceptor
        return new BashShellInterceptor(bootstrapCommand, shellDirectory);
    }

    这个是如果不是sudo的方式,进行命令执行的前缀
    @Override
    protected String shellInterpreter() {
        return "bash";
    }

    @Override
    protected String shellExtension() {
        return ".sh";
    }

    @Override
    protected String shellHeader() {
        return "#!/bin/bash";
    }
}

org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#generateBootstrapCommand

protected List<String> generateBootstrapCommand() {
        if (sudoEnable) {
            TODO 默认是走这里的,其实就是sudo -u 租户 -i opt/xx.sh
            return bootstrapCommandInSudoMode();
        }
        TODO bash opt/xx.sh
        return bootstrapCommandInNormalMode();
    }

bootstrapCommandInSudoMode():

private List<String> 


bootstrapCommandInSudoMode() {
        if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
            return bootstrapCommandInResourceLimitMode();
        }
        List<String> bootstrapCommand = new ArrayList<>();
        bootstrapCommand.add("sudo");
        if (StringUtils.isNotBlank(runUser)) {
            bootstrapCommand.add("-u");
            bootstrapCommand.add(runUser);
        }
        bootstrapCommand.add("-i");
        bootstrapCommand.add(shellAbsolutePath().toString());
        return bootstrapCommand;
    }

bootstrapCommandInNormalMode():

private List<String> bootstrapCommandInNormalMode() {
        List<String> bootstrapCommand = new ArrayList<>();
        bootstrapCommand.add(shellInterpreter());
        bootstrapCommand.add(shellAbsolutePath().toString());
        return bootstrapCommand;
    }

1.2、命令的执行

org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor

public abstract class BaseShellInterceptor implements IShellInterceptor {

    protected final String workingDirectory;
    protected final List<String> executeCommands;

    protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
        this.executeCommands = executeCommands;
        this.workingDirectory = workingDirectory;
    }

    @Override
    public Process execute() throws IOException {
        init process builder
        ProcessBuilder processBuilder = new ProcessBuilder();
        setting up a working directory
        TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
        processBuilder.directory(new File(workingDirectory));
        merge error information to standard output stream
        processBuilder.redirectErrorStream(true);
        processBuilder.command(executeCommands);
        log.info("Executing shell command : {}", String.join(" ", executeCommands));
        return processBuilder.start();
    }
}


2

最佳实践实例

2.1、pom.xml配置

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
  <version>2.6.1</version>
</dependency>

2.2、pom.xml配置

@SpringBootApplication
public class Application {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);

        List<String> executeCommands = new ArrayList<>();
        executeCommands.add("sudo");
        executeCommands.add("-u");
        executeCommands.add("qiaozhanwei");
        executeCommands.add("-i");
        executeCommands.add("/opt/test/my.sh");


        ProcessBuilder processBuilder = new ProcessBuilder();
        setting up a working directory
        TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
        processBuilder.directory(new File("/opt/test"));
        merge error information to standard output stream
        processBuilder.redirectErrorStream(true);
        processBuilder.command(executeCommands);
        Process process = processBuilder.start();

        try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
            String line;
            while ((line = inReader.readLine()) != null) {
                TODO 终端日志输出
                System.out.println(line);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }


        TODO 等10分钟,如果10分钟不结束,返回且status为false
        boolean status = process.waitFor(10, TimeUnit.MINUTES);

        System.out.println("status ->" + status);
    }
}

2.3、日志输出结果

  .   ____          _            __ _ _
 /\\ ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, |
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.1)

2024-06-15 18:33:16.090  INFO 31834 --- [           main] com.journey.test.Application             : Starting Application using Java 1.8.0_401 on 192.168.1.4 with PID 31834 (/Users/qiaozhanwei/IdeaProjects/springboot2/target/classes started by qiaozhanwei in Users/qiaozhanwei/IdeaProjects/springboot2)
2024-06-15 18:33:16.091  INFO 31834 --- [           main] com.journey.test.Application             : No active profile set, falling back to default profiles: default
2024-06-15 18:33:16.244  INFO 31834 --- [           main] com.journey.test.Application             : Started Application in 0.252 seconds (JVM running for 0.42)
Number of Maps  = 1
Samples per Map = 100000
2024-06-15 18:33:16,790 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Wrote input for Map #0
Starting Job
2024-06-15 18:33:17,329 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at kvm-10-253-26-85/10.253.26.85:8032
2024-06-15 18:33:17,586 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: tmp/hadoop-yarn/staging/qiaozhanwei/.staging/job_1694766249884_0931
2024-06-15 18:33:17,837 INFO input.FileInputFormat: Total input files to process : 1
2024-06-15 18:33:18,024 INFO mapreduce.JobSubmitter: number of splits:1
2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1694766249884_0931
2024-06-15 18:33:18,460 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-06-15 18:33:18,648 INFO conf.Configuration: resource-types.xml not found
2024-06-15 18:33:18,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2024-06-15 18:33:18,698 INFO impl.YarnClientImpl: Submitted application application_1694766249884_0931
2024-06-15 18:33:18,734 INFO mapreduce.Job: The url to track the job: http://kvm-10-253-26-85:8088/proxy/application_1694766249884_0931/
2024-06-15 18:33:18,734 INFO mapreduce.Job: Running job: job_1694766249884_0931
2024-06-15 18:33:24,978 INFO mapreduce.Job: Job job_1694766249884_0931 running in uber mode : false
2024-06-15 18:33:24,978 INFO mapreduce.Job:  map 0% reduce 0%
2024-06-15 18:33:29,153 INFO mapreduce.Job:  map 100% reduce 0%
2024-06-15 18:33:34,384 INFO mapreduce.Job:  map 100% reduce 100%
2024-06-15 18:33:34,455 INFO mapreduce.Job: Job job_1694766249884_0931 completed successfully
2024-06-15 18:33:34,565 INFO mapreduce.Job: Counters: 54
    File System Counters
        FILE: Number of bytes read=28
        FILE: Number of bytes written=548863
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=278
        HDFS: Number of bytes written=215
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=3
        HDFS: Number of bytes read erasure-coded=0
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=37968
        Total time spent by all reduces in occupied slots (ms)=79360
        Total time spent by all map tasks (ms)=2373
        Total time spent by all reduce tasks (ms)=2480
        Total vcore-milliseconds taken by all map tasks=2373
        Total vcore-milliseconds taken by all reduce tasks=2480
        Total megabyte-milliseconds taken by all map tasks=4859904
        Total megabyte-milliseconds taken by all reduce tasks=10158080
    Map-Reduce Framework
        Map input records=1
        Map output records=2
        Map output bytes=18
        Map output materialized bytes=28
        Input split bytes=160
        Combine input records=0
        Combine output records=0
        Reduce input groups=2
        Reduce shuffle bytes=28
        Reduce input records=2
        Reduce output records=0
        Spilled Records=4
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=87
        CPU time spent (ms)=1420
        Physical memory (bytes) snapshot=870387712
        Virtual memory (bytes) snapshot=9336647680
        Total committed heap usage (bytes)=2716860416
        Peak Map Physical memory (bytes)=457416704
        Peak Map Virtual memory (bytes)=3773362176
        Peak Reduce Physical memory (bytes)=412971008
        Peak Reduce Virtual memory (bytes)=5563285504
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=118
    File Output Format Counters 
        Bytes Written=97
Job Finished in 17.292 seconds
Estimated value of Pi is 3.14120000000000000000
status ->true

Process finished with exit code 0

转载自Journey
原文链接:https://segmentfault.com/a/1190000044966157





用户案例



网易邮箱 每日互动 惠生工程  作业帮 
博世智驾 蔚来汽车 长城汽车集度长安汽车
思科网讯食行生鲜联通医疗联想
新网银行唯品富邦消费金融 
自如有赞伊利当贝大数据
珍岛集团传智教育Bigo
YY直播  三合一太美医疗
Cisco Webex兴业证券




迁移实战



Azkaban   Ooize(当贝迁移案例)
Airflow (有赞迁移案例)
Air2phin(迁移工具)
Airflow迁移实践



发版消息




Apache DolphinScheduler 3.2.2版本正式发布!
Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
Apache DolphinScheduler 3.3.0 Alpha发布,功能增强与性能优化大升级!




加入社区



关注社区的方式有很多:

  • GitHub: https://github.com/apache/dolphinscheduler
  • 官网:https://dolphinscheduler.apache.org/en-us
  • 订阅开发者邮件:dev@dolphinscheduler@apache.org
  • X.com:@DolphinSchedule
  • YouTube:https://www.youtube.com/@apachedolphinscheduler
  • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

📂非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

👩‍💻代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler


你的好友秀秀子拍了拍你

并请你帮她点一下“分享”

文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论