暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache Dolphinscheduler 任务插件版图再添 Linkis,大幅提高计算治理能力

海豚调度 2022-12-21
1032

 点亮 ⭐️ Star · 照亮开源之路

https://github.com/apache/dolphinscheduler


今天,我们来看一下 Apache DolphinScheduler 如何与 Apache Linkis 做集成。两个月前,Apache DolphinScheduler 社区 PMC 邀请我一起来做这项工作,我觉得很有意思,就把它接下来了。

这次分享主要分为两块,一是讲解 Apache DolphinScheduler 的任务插件扩展是怎么设计的,为什么这么设计;二是 Apache DolphinScheduler 与 Linkis 是怎么集成的,包括我和 Linkis PMC 的相关讨论。也欢迎大家踊跃加入,把更多插件贡献出来,一起开源。




任务插件扩展介绍


1

Apache DolphinScheduler


首先介绍一下 Apache DolphinScheduler,这是一个分布式,去中心化,易扩展的可视化 DAG 工作流项目调度平台,致力于解决数据处理流过程中复杂的依赖关系,让调度系统在数据处理过程中开箱即用。


上图为 Apache DolphinScheduler 目前的架构图,主要分为 Master 和 Worker,其中 Master 用来做分发调度,Worker 用来做任务执行,包括 UI、API、Alert  告警、ZK 分布式处理等,最后执行各式各样的任务。

Apache DolphinScheduler 有 4 个特性,一是高可靠性,去中心化的多 Master 和多Worker 服务对等架构, 避免单 Master 压力过大,另外采用任务缓冲队列来避免过载;

第二,简单易用,通过拖拽完成 DAG 定义,和 Airflow 不同,后者对于具有一定 Python 功底、擅长统计学的工程师和分析师才会友好,而 DolphinScheduler 可以通过拖拽定义工作流;第三,使用场景丰富,支持多租户,支持暂停恢复操作. 紧密贴合大数据生态,提供 Spark, Hive, M/R, Python, Sub_process, Shell 等任务类型,现在随着 3.0 版本的迭代,支持的任务类型更加丰富;第四,高扩展性,因为 Master 和 Worker 是无中心的分布式设计,所以可以一直水平扩展,即使随着业务场景越来越多,调度规模越来越大,也能通过动态上下线实现调度。

2

Dolphinscheduler 任务组件使用示例


接下来,我们通过 SparkSQL 为例,来介绍 Apache DolphinScheduler 任务组件如何使用。具体是为创建一个视图表 terms 并写入三行数据和一个格式为 Parquet 的表 wc 并判断该表是否存在。程序类型为 SQL,将视图表 terms 的数据插入到格式为 Parquet 的表 wc。

在工作流定义这一块,可以看到 DolphinScheduler 现在所拥有的任务组件,包括一些简单的 Shell, 一些逻辑组件,像 Sub-process,还有 dependent 这些依赖组件。

有些是通用节点,还有一些任务执行节点。拖进这些节点之后,就可以跳出下图所示界面:

每个任务都不一样,我们编辑自己需要执行的内容,设置 SPI 等,点击确认,就可以成功地定义一个工作流,并且可以监控状态、获取日志等。

3

SPI 服务发现


我们有这么多任务组件,是怎么做的呢?这就涉及到了我们刚才讲到的 SPI 服务发现功能。

SPI 全称叫这个Service Provider Interface,是 JDK 内置的一种服务提供发现机制。大多数人可能会很少用到它,因为它的定位主要是面向开发厂商的,在 java.util.ServiceLoader 的文档里有比较详细的介绍,其抽象的概念是指动态加载某个服务实现。


这就是加载整个动态的服务实现。从 DolphinScheduler 的 worker Server Start 服务启动开始,有一个类叫  TaskpluginManager的类会进行装载、监听,然后触发 SPI Classloader,loading 这些 class。

在 DolphinScheduler 上有二开的小伙伴会在每次 Woker 启动时看到日志里会打印这些信息。

Apache DolphinScheduler 执行的任务分为逻辑 Task 和物理 Task,逻辑 Task 指 DependTask,SwitchTask 这种逻辑上的 Task;物理 Task 是指 ShellTask,SQLTask 这种执行任务的 Task。而在 Apache DolphinScheduler 中,我们一般扩充的都是物理 Task,而物理 Task 都是交由 Worker 去执行,所以我们要明白的是,当我们在有多台 Worker 的情况下,要将自定义的 Task 分发到每一台有 Worker 的机器上,当我们启动 Worker 服务时,Worker 会去启动一个 ClassLoader 来加载相应的实现了规则的 Task lib。此次集成 Linkis 也是这样的过程。

4

构建Task实现


通过 TaskChannel 我们得到了可执行的物理 Task,但是我们需要给当前 Task 添加相应的实现,才能够让Apache DolphinScheduler 去执行你的任务,首先在编写 Task 之前我们需要先了解一下 Task 之间的关系:

上图相当于一个类的集成图。最上角是抽象的 task 类,下面集成了这个抽象 task 类,以及 task excuter。下面分为 3 个大类型,分别为基于 YARN、Python 和 Shell 的集成。

5

任务插件开发流程(基于 SHELL 的任务)


  • 基于 YARN 的计算(参见 MapReduceTask)

    • 需要在 org.apache.dolphinscheduler.server.worker.task 下的 TaskManager 类中创建自定义任务(也需在TaskType注册对应的任务类型)
    • 需要集成 org.apache.dolphinscheduler.server.worker.task 下的 AbstractYarnTask
    • 构造方法调度 AbstractYarnTask 构造方法
    • 集成 AbstractParameters 自定义任务参数实体
    • 重写 AbstractTask 的 init 方法中解析自定义任务参数
    • 重写 buildCommand 封装 command

  • 基于非 YARN 的计算(参见 ShellTask)

    • 需要在 org.apache.dolphinscheduler.server.worker.task 下的 TaskManager 中创建自定义任务
    • 需要集成 org.apache.dolphinscheduler.server.worker.task 下的 AbstractTask
    • 构造方法中实例化 ShellCommandExecutor

    public ShellTask(TaskProps props, Logger logger) {
    super(props, logger);
    this.taskDir = props.getTaskDir();
    this.processTask = new ShellCommandExecutor(this::logHandle,
    props.getTaskDir(), props.getTaskAppId(),
    props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
    props.getTaskTimeout(), logger);
    this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
    }

      • 传入自定义任务的 TaskProps和自定义Logger,TaskProps 封装了任务的信息,Logger分装了自定义日志信息
      • 集成 AbstractParameters 自定义任务参数实体
      • 重写 AbstractTask 的 init 方法中解析自定义任务参数实体
      • 重写 handle 方法,调用 ShellCommandExecutor 的 run 方法,第一个参数传入自己的command,第二个参数传入 ProcessDao,设置相应的 exitStatusCode

    • 基于非 SHELL 的任务(参见 SqlTask)

      • 需要在 org.apache.dolphinscheduler.server.worker.task 下的 TaskManager 中创建自定义任务
      • 需要集成org.apache.dolphinscheduler.server.worker.task 下的 AbstractTask
      • 集成 AbstractParameters 自定义任务参数实体
      • 构造方法或者重写 AbstractTask 的 init 方法中,解析自定义任务参数实体
      • 重写 handle 方法实现业务逻辑并设置相应的exitStatusCode

    看起来很麻烦,但是不用担心,通过我接下来对集成 Linkis 过程的讲解,大家就会知道任务插件流程其实很简单。




    Apache DolphinScheduler与Linkis 的集成


    接下来进入第二部分,来讲 DolphinScheduler 如何与 Linkis 集成,我是怎么如了解 Linkis 的。集成完之后,大家会发现其实这个事情非常简单。

    1

    Linkis 架构概要


    本着开源共建的思想,我们集成了优秀的计算中间件 Linkis 作为任务插件。作为计算中间件,Linkis 提供了强大的连通、复用、编排、扩展和治理管控能力。通过计算中间件将应用层和引擎层解耦,简化了复杂的网络调用关系,降低了整体复杂度,同时节约了整体开发和维护成本。

    Linkis 提供了计算治理服务、公共增强服务和微服务治理服务三大服务,我们主要用到了它的计算治理服务,在提交准备执行阶段,我们可以在 Linkis 计算中间件中执行很多任务类型,如果任务过多,Linkis 可以帮助我们抽象、复用这些计算过程。
    Linkis 架构

    2

    应用方式一(JAVA SDK)


    接下来我们来讲一下,DolphinScheduler 如何集成 Linkis,以及我在这个过程中的思考。

    Linkis 提供两个对我们来说非常好用的两个集成类,一个是 Java SDK,一个是 Linkis-Cli,也就是 Shell 的能力。

    Java SDK 是 Linkis 官方提供的一个 Java 包,可以在前面 pom 里面引用依赖之后定义版本,后面我们可以在里面定义引擎和参数,包括比如 Spark 的运行参数,Linkis 本身的 YARN 队列参数等,定义好之后执行并提交。

    示例代码如下:

      private static JobExecuteResult toSubmit(String user, String code) {
      // 1. build params
      // set label map :EngineTypeLabel/UserCreatorLabel/EngineRunTypeLabel/Tenant
      Map labels = new HashMap();
      labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-2.4.3"); // required engineType Label
      labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, user + "-APPName");// required execute user and creator eg:hadoop-IDE
      labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py"); // required codeType
      // set start up map :engineConn start params
      Map startupMap = new HashMap(16);
      // Support setting engine native parameters,For example: parameters of engines such as spark/hive
      startupMap.put("spark.executor.instances", 2);
      // setting linkis params
      startupMap.put("wds.linkis.rm.yarnqueue", "dws");




      // 2. build jobSubmitAction
      JobSubmitAction jobSubmitAction = JobSubmitAction.builder()
      .addExecuteCode(code)
      .setStartupParams(startupMap)
      .setUser(user) //submit user
      .addExecuteUser(user) // execute user
      .setLabels(labels)
      .build();
      // 3. to execute
      return client.submit(jobSubmitAction);
      }

      引入依赖模块,创建 Client 的过程:

         org.apache.linkis
        linkis-computation-client
        ${linkis.version}
        如:
        org.apache.linkis
        linkis-computation-client
        1.0.3

        然而,虽然这是一个非常好的 SDK 包,理论上说非常容易集成到 DolphinScheduler,但是它有一个致命的弱点,那就是需要适配 Linkis 的版本。但是适配 Linkis 每个版本不现实,所以最终我们最终放弃这个方法。

        3

        应用方式二(Linkis-Cli)


        经过和 Linkis 社区 PPMC 沟通,我们最终选择了以 Shell 方式去调用 Flink 任务来减少多版本兼容上的差异,来避免这个问题。

        于是我们就想到了用 Linkis 提供的一个封装好的 Client,它只需要在 Linkis 的安装目录里面跑一段代码,就可以运行 SparkSQL。具体如下:

        第一步,检查conf/目录下是否存在默认配置文件linkis-cli.properties,且包含以下配置:
         
           #linkis-mg-gateway服务地址
          wds.linkis.client.common.gatewayUrl=http://127.0.0.1:9001
          #认证鉴权策略 token/static
          wds.linkis.client.common.authStrategy=token
          #static 模式下为用户名/密码,token模式下为linkis-mg-gateway_auth_token表中token_name 和logal_users
          wds.linkis.client.common.tokenKey=Validation-Code
          wds.linkis.client.common.tokenValue=BML-AUTH

          第二步,进入 Linkis 安装目录,输入指令
            sh ./bin/linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from testdb.test;"  -submitUser hadoop -proxyUser hadoop

            第三步,您会在控制台看到任务被提交到Linkis,并开始执行的信息。

            这个方法并不需要在意 Linkis 的版本,只需要确定执行引擎是什么,你执行的用户是什么,你执行的内容是什么,就可以被提交到 Linkis 去执行。

            我们把这个思路用到了 DolphinScheduler 集成 Linkis 的整个过程中。

            若生产环境中要是使用到 Linkis 任务类型,则需要先配置好所需的环境,配置文件如下:
              /dolphinscheduler/conf/env/dolphinscheduler_env.sh

              定义好之后,可以在其他的这个类里面选出 Linkis,可以看到 Linkis 的参数:

              刚刚我们所提到的,可以定义引擎的类型,Code type 等可以加入到 Linkis 的 Prop 和 Value 里,就完成了第一个属性的创建。

              接下来我们可以点一下加号,尽可能多地填入属性,然后去创建我们的 Linkis 任务。

              4

              集成流程


              具体是怎么做的呢?这是我开发 task Linkis 的集成。

              Linkis 插件的目录结构,主要分为四个文件:
              • LinkisParameters(参数)
              • LinkisTask(任务执行)
              • LinkisTaskChannel(创建、取消等实现类)
              • LinkisTaskChannelFactory(TaskChannel的工厂实现类)


              参数和任务执行是比较上层的方法,主要的开发主要是这两个方法。

              集成流程的时候,主要是提交任务这一块,和构建我们的 Command。


              我们来看这个提交方法,根据用户的输入、参数的配置构建了一个 Linkis 的 Shell 执行字符串,通过我们定义的 shellCommandExecutor去 执行该 command,根据 command  运行的 task response 去获取到任务 ID,作为 App ID等,最终实现对 Linkis 任务的提交。如果有异常,可以把 Exit code 定义成失败或其他类型,

              Tips:
              通过 Linkis-cli 的 status 来实现。

              DolphinScheduler 在提交 Linkis 任务时会自动加入参数--async true 以实现异步提交。

              接下来讲 Build Command 的过程。

              首先就是刚才我们所说的 Share clean 的 options,就是刚才所定义的 o p t/soft/linkis 的目录,定义好之后,我们就可以连接刚才所说的 Linkis Client 的命令行,默认 Linkis 的异步提交参数,再把我 Prop 和 Value 键值对拼接到 ArrayList 中,然后一步步添加,组成 ArrayList 的 Command。解析完之后,大家可以看到,通过 Space 拆分成 Command,再提交到这个ShellCommandExecutor 里面执行。

              看起来很复杂的集成,其实主要就是用这个方法,只需要把 Command 构建好了之后放到我们所定义的上层类里面去执行,就可以了。

              接着我们来看这个监听任务状态的方法,它会每隔一段时间去运行该监听方法,根据对返回日志中 status 的检查,获取到最新的任务状态,并随着 Linkis 状态的改变来更新 DolphinScheduler 的任务状态。

              Tips:通过 Linkis-cli 的 status 来实现。

              当然 KILL 任务同理,我们也就不再赘述了。




              愿景


              可以看到,DolphinScheduler 对 Linkis 的集成其实非常简单,而像这样的集成还有很多,包括 PyTorch、Flink、Spark 等,希望大家多多参与到 DolphinScheduler 的开源共建中,我们鼓励任何形式的参与社区,最终成为 Committer 或 PPMC,如:

              • 将遇到的问题通过 github 上 issue 的形式反馈出来
              • 回答别人遇到的 issue 问题
              • 帮助完善文档
              • 帮助项目增加测试用例
              • 为代码添加注释
              • 提交修复 Bug 或者 Feature 的 PR
              • 发表应用案例实践、调度流程分析或者与调度相关的技术文章
              • 帮助推广 DolphinScheduler,参与技术大会或者 meetup 的分享等


              另外,本次演讲我们详细讲了 Dolphinscheduler 集成任务插件的流程,希望大家可以根据自己的使用情况,多多丰富我们的任务插件,让我们一起共建蓬勃的开源大数据生态!

              相信参与 DolphinScheduler,一定会让您从开源中受益!

              参与贡献


              随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


              参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


              贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


              社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


              非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


              如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html


              来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


              参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

              添加社区小助手微信(Leonard-ds) 



              添加小助手微信时请说明想参与贡献。


              来吧,开源社区非常期待您的参与。



              < 🐬🐬 >
              更多精彩推荐

              DolphinScheduler 登陆 AWS AMI 应用市场!

              DolphinScheduler 机器学习工作流预测今年 FIFA 世界杯冠军大概率是荷兰!

              DolphinScheduler 快速构建 Hugging Face 文本分类工作流,基于工作流的机器学习训练部署太强了!

              DolphinScheduler 发布 3.0.3 版本,重点修复 6 个 Bug

              名额已排到10月 | Apache DolphinScheduler Meetup分享嘉宾继续火热招募中

              【Meetup讲师】您有一张社区认证讲师证书未领取,点击领取!

              Apache DolphinScheduler v2.0.1 Master 和 Worker 执行流程分析系列(三)



              我知道你在看

              文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论