暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

实战分享:DolphinScheduler 中 Shell 任务环境变量最佳配置方式

海豚调度 2025-05-29
162

点击蓝字,关注我们

在使用 Apache DolphinScheduler 编排任务的过程中,Shell 类型任务是最常见的任务类型之一。然而,很多用户在实际使用中都会遇到一个看似简单却常常引发问题的问题——环境变量怎么设置才有效?

如果你也曾经因为任务执行环境不一致、找不到命令路径、引用变量失败等问题而抓狂,那么这篇文章将为你拨开迷雾。本文将深入解析 DolphinScheduler 中 Shell 任务的环境变量设置机制,分享几种常见的配置方式、注意事项以及实战踩坑经验,帮助你高效、稳定地配置任务运行环境。


1

任务类型总结


  • SHELL任务类型:

    SHELL、JAVA、PYTHON、FLINK、MR、FLINK_STREAM、HIVECLI、SPARK、SEATUNNEL、DATAX、SQOOP、DATA_QUALICY、JUPYTER、MLFLOW、OPENMLDB、DVC、PYTORCH、KUBEFLOW、CHUNJUN、LINKIS

注意 : 所谓的SHELL任务类型,都是对SHELL任务类型进行的封装,说白了底层调用的就是Java ProcessBudiler
封装的SHELL。

  • SQL任务类型(JDBC):

    SQL、PROCEDURE

注意 : SQL任务类型其实使用的就是各个DB驱动的JDBC进行的操作。

  • HTTP任务类型:

    HTTP、DINKY、PIGEON(WebSocket)

注意 : HTTP任务类型其实就是访问其OPEN API,进行HttpClient
封装调用的操作。

  • 逻辑节点:

    SUB_PROCESS、DEPENDENT、CONDITIONS、SWITHC、DYNAMIC

注意 : 所谓的逻辑节点是虚拟任务,这类任务不会调度到Worker节点上去运行,只会在Master节点作为控制节点。

  • Client任务类型:

    EMR、K8S、DMS、DATA_FACTORY、SAGEMAKER、ZEPPELIN、DATASYNC、REMOTESHELL

注意 : 其实就是调用各个任务的开放的Client进行任务的封装。


2

Shell任务怎么配置环境变量呢?


因为可能涉及到一个组件的不同的版本的客户端,比如说Spark2、Spark3。还有就是针对不同集群的不同客户端,比如说集群1的Spark3客户端和集群2的Spark客户端。 像这样的需求,怎么在dolphinscheduler中进行配置呢?或者说有几种配置方式呢?

两种方式 : 1、通过task不同的环境变量 2、默认的环境变量

1. 通过task不同的环境变量

安全中心 -> 环境管理

任务中引用

默认的环境变量

common.properties

    # The default env list will be load by Shell task, e.g. etc/profile,~/.bash_profile
    shell.env_source_list=/etc/profile
    # The interceptor type of Shell task, e.g. bash, sh, cmd
    shell.interceptor.type=bash

    org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory

      public class ShellInterceptorBuilderFactory {
          private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type""bash");
          @SuppressWarnings("unchecked")
          public static IShellInterceptorBuilder newBuilder() {
              // TODO 默认的走的是这个逻辑
              if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
                  return new BashShellInterceptorBuilder();
              }
              if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
                  return new ShShellInterceptorBuilder();
              }
              if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
                  return new CmdShellInterceptorBuilder();
              }
              throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
          }
      }

      org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder

        public class BashShellInterceptorBuilder
                extends
                    BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilderBashShellInterceptor> {
            @Override
            public BashShellInterceptorBuilder newBuilder() {
                return new BashShellInterceptorBuilder();
            }
            @Override
            public BashShellInterceptor build() throws FileOperateExceptionIOException {
                // TODO 这里是生成shell脚本的核心点
                generateShellScript();
                List<String> bootstrapCommand = generateBootstrapCommand();
                // TODO 实例化BashShellInterceptor
                return new BashShellInterceptor(bootstrapCommand, shellDirectory);
            }
            @Override
            protected String shellInterpreter() {
                return "bash";
            }
            @Override
            protected String shellExtension() {
                return ".sh";
            }
            @Override
            protected String shellHeader() {
                return "#!/bin/bash";
            }
        }

        org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run

          public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
                                      TaskCallBack taskCallBack) throws Exception {
                  TaskResponse result = new TaskResponse();
                  int taskInstanceId = taskRequest.getTaskInstanceId();
                  // todo: we need to use state like JDK Thread to make sure the killed task should not be executed
                  iShellInterceptorBuilder = iShellInterceptorBuilder
                          // TODO 设置执行路径
                          .shellDirectory(taskRequest.getExecutePath())
                          // TODO 这里设置shell 名字
                          .shellName(taskRequest.getTaskAppId());
                  // Set system env
                  // TODO 在这里是设置默认的,其实也是可以设置为 opt/dolphinscheduler/bin/env/dolphinscheduler_env.sh
                  if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
                      // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
                      ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
                  }
                  // Set custom env
                  // TODO 设置自定义的env
                  if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
                      // TODO 向 customEnvScripts 中加入
                      iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
                  }
                  // Set k8s config (This is only work in Linux)
                  if (taskRequest.getK8sTaskExecutionContext() != null) {
                      iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
                  }
                  // Set sudo (This is only work in Linux)
                  // TODO 设置sudo为true的模式
                  iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
                  // Set tenant (This is only work in Linux)
                  // TODO 设置租户
                  iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
                  // Set CPU Quota (This is only work in Linux)
                  if (taskRequest.getCpuQuota() != null) {
                      iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
                  }
                  // Set memory Quota (This is only work in Linux)
                  if (taskRequest.getMemoryMax() != null) {
                      iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
                  }
                  IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
                  // TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
                  process = iShellInterceptor.execute();
                  // parse process output
                  // TODO 这里解析到进程的输出
                  parseProcessOutput(this.process);
                  // collect pod log
                  collectPodLogIfNeeded();
                  int processId = getProcessId(this.process);
                  result.setProcessId(processId);
                  // cache processId
                  taskRequest.setProcessId(processId);
                  // print process id
                  log.info("process start, process id is: {}", processId);
                  // if timeout occurs, exit directly
                  long remainTime = getRemainTime();
                  // update pid before waiting for the run to finish
                  if (null != taskCallBack) {
                      // TODO 这里其实就是更新任务实例西悉尼
                      taskCallBack.updateTaskInstanceInfo(taskInstanceId);
                  }
                  // waiting for the run to finish
                  boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
                  TaskExecutionStatus kubernetesStatus =
                          ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
                  if (taskOutputFuture != null) {
                      try {
                          // Wait the task log process finished.
                          taskOutputFuture.get();
                      } catch (ExecutionException e) {
                          log.error("Handle task log error", e);
                      }
                  }
                  if (podLogOutputFuture != null) {
                      try {
                          // Wait kubernetes pod log collection finished
                          podLogOutputFuture.get();
                          // delete pod after successful execution and log collection
                          ProcessUtils.cancelApplication(taskRequest);
                      } catch (ExecutionException e) {
                          log.error("Handle pod log error", e);
                      }
                  }
                  // if SHELL task exit
                  if (status && kubernetesStatus.isSuccess()) {
                      // SHELL task state
                      result.setExitStatusCode(this.process.exitValue());
                  } else {
                      log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
                              taskRequest.getTaskTimeout());
                      result.setExitStatusCode(EXIT_CODE_FAILURE);
                      cancelApplication();
                  }
                  int exitCode = this.process.exitValue();
                  String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
                  log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
                          exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
                  return result;
              }

          重点就是:

            // Set system env
                    // TODO 在这里是设置默认的,其实也是可以设置为 opt/dolphinscheduler/bin/env/dolphinscheduler_env.sh
                    if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
                        // TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
                        ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
                    }
                    // Set custom env
                    // TODO 设置自定义的env
                    if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
                        // TODO 向 customEnvScripts 中加入
                        iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
                    }

            其实就是说自定的环境变量是可以覆盖默认的环境变量的。

            转载自Journey
            原文链接:https://segmentfault.com/a/1190000044954252





            用户案例



            网易邮箱 每日互动 惠生工程  作业帮  
            博世智驾 蔚来汽车 长城汽车集度长安汽车
            思科网讯食行生鲜联通医疗联想
            新网银行唯品富邦消费金融 
            自如有赞伊利当贝大数据
            珍岛集团传智教育Bigo
            YY直播  三合一太美医疗
            Cisco Webex兴业证券




            迁移实战



            Azkaban   Ooize(当贝迁移案例)
            Airflow (有赞迁移案例)
            Air2phin(迁移工具)
            Airflow迁移实践



            发版消息




            Apache DolphinScheduler 3.2.2版本正式发布!
            Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
            Apache DolphinScheduler 3.3.0 Alpha发布,功能增强与性能优化大升级!




            加入社区



            关注社区的方式有很多:

            • GitHub: https://github.com/apache/dolphinscheduler
            • 官网:https://dolphinscheduler.apache.org/en-us
            • 订阅开发者邮件:dev@dolphinscheduler@apache.org
            • X.com:@DolphinSchedule
            • YouTube:https://www.youtube.com/@apachedolphinscheduler
            • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

            同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

            📂非代码方式包括:

            完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

            👩‍💻代码方式包括:

            查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

            贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

            社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

            优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

            如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

            如果你❤️小海豚,就来为我点亮Star吧!

            https://github.com/apache/dolphinscheduler


            你的好友秀秀子拍了拍你

            并请你帮她点一下“分享”

            文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论