暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何快速在 Apache DolphinScheduler 新扩展一个任务插件?

海豚调度 2023-09-19
181

点击蓝字 关注我们

作者 | 代立冬
编辑 | Debra Chen

Apache DolphinScheduler 是现代数据工作流编排平台,具有非常强大的可视化能力,DolphinScheduler 致力于使数据工程师、分析师、数据科学家等数据工作者都可以简单轻松地搭建各种数据工作流,让数据处理流程更简单可靠。

DolphinScheduler 非常易于使用(easy to use),目前有四种创建工作流的方法:

  • 在 UI 界面上直接通过拖放任务的方式来创建任务
  • PyDolphinScheduler,通过 Python API 创建工作流,也就是 workflow as code 的方式
  • 编写 yaml 文件,通过 yaml 创建工作流(目前必须安装 PyDolphinScheduler)
  • 通过 Open API 的方式来创建工作流

以上 4 种总有一种方式适合您的场景!

得益于 DolphinScheduler 采用无中心化的整体架构设计,使得 DolphinScheduler 调度性能也是同类开源数据工作流编排平台的 5 倍以上,如果您正有这样的性能问题或者调度延时问题,也不妨试试 DolphinScheduler。

DolphinScheduler界面

好的,接下来言归正题,有不少用户想在 DolphinScheduler 扩展新的任务插件支持(比如添加 Kettle),DolphinScheduler 的任务插件体系是基于 SPI 来进行任务插件扩展的。

01

什么是 SPI 服务发现?


SPI 是 Service Provider Interface 的缩写,是一种常见的服务提供发现机制,比如知名的 OLAP 引擎 Presto 也是使用 SPI 来扩展的。在 java.util.ServiceLoader 的文档里有比较详细的介绍,其抽象的概念是指动态加载某个服务实现。

比如 java.sql.Driver 接口,不同厂商可以针对同一接口做出不同的实现,比如 MySQL 和 PostgreSQL 都有不同的实现提供给用户,而 Java 的 SPI 机制可以为某个接口寻找服务实现。Java 中 SPI 机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

SPI 整体机制图如下:

SPI 机制中有 4 个重要的组件 :

  • 服务接口 Service Interface
  • 服务接口实现:不同的服务提供方可以提供一个或多个实现;框架或者系统本身也可以提供默认的实现
  • 提供者注册 API(Provider Registration API),这是提供者用来注册实现的
  • 服务访问 API (Service Access API) ,这是调用方用来获取服务的实例的接口

Apache DolphinScheduler 从 2.0 版本开始引入 SPI。将 Apache DolphinScheduler 的 Task 看成一个执行服务,而我们需要根据使用者的选择去执行不同的服务,如果没有的服务,则需要我们自己扩充,我们只需要完成我们的 Task 具体实现逻辑,然后遵守 SPI 的规则,编译成 Jar 并上传到指定目录,就可以使用我们自己编写的 Task 插件来执行具体的任务了。

02

谁在使用它?


除了前面提到的 Presto 外,还有以下技术都使用到 SPI 技术:

1、Apache DolphinScheduler
  • Task
  • Datasource
2、Apache Flink
  • Flink sql connector,用户实现了一个 Flink-connector 后,Flink 也是通过 SPI 来动态加载的
3、SpringBoot
  • Spring boot spi
4、JDBC
  • JDBC4 也基于 SPI 的机制来发现驱动提供商了,可以通过META-INF/services/java.sql.Driver 文件里指定实现类的方式来暴露驱动提供者
5、更多
  • common-logging

03

DolphinScheduler SPI工作流程


如上图,Apache DolphinScheduler 中有 2 种 Task : 逻辑 Task 和物理 Task,逻辑 Task 指 Dependent Task,Switch Task 这种控制工作流逻辑的任务插件;物理 Task 是指 Shell Task,SQL Task ,Spark Task ,Python Task 等这种执行具体任务的 Task。

在 Apache DolphinScheduler 中,我们一般扩充的都是物理 Task,物理 Task 都是由 Worker 来调用并执行的,当启动 Worker 服务时,Worker 会来加载相应的实现了规则的 Task lib,HiveTask 被 Apache DolphinScheduler TaskPluginManage 加载了。SPI 的规则图上也有描述,也可以参考 java.util.ServiceLoader 类。

04

如何扩展一个任务插件?


4.1 创建 Maven 项目
    mvn archetype:generate \
    -DarchetypeGroupId=org.apache.dolphinscheduler \
    -DarchetypeArtifactId=dolphinscheduler-hive-client-task \
    -DarchetypeVersion=1.10.0 \
    -DgroupId=org.apache.dolphinscheduler \
    -DartifactId=dolphinscheduler-hive-client-task \
    -Dversion=0.1 \
    -Dpackage=org.apache.dolphinscheduler \
    -DinteractiveMode=false

    4.2 Maven 依赖

      org.apache.dolphinscheduler
      dolphinscheduler-spi
      ${dolphinscheduler.lib.version}
      ${common.lib.scope}








      org.apache.dolphinscheduler
      dolphinscheduler-task-api
      ${dolphinscheduler.lib.version}
      ${common.lib.scope}

      4.3 创建 Task 通道工厂(TaskChannelFactory)

      org.apache.dolphinscheduler.spi.task.TaskChannel

      插件实现以上接口即可。主要包含创建任务(任务初始化,任务运行等方法)、任务取消,如果是 yarn 任务,则需要实现 org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask。

      我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。

      首先我们需要创建任务服务的工厂,其主要作用是帮助构建 TaskChannel 以及 TaskPlugin 参数,同时给出该任务的唯一标识,ChannelFactory 在 Apache DolphinScheduler 的 Task 服务组中,其作用属于是在任务组中的承上启下,交互前后端以及帮助 Worker 构建 TaskChannel。
        package org.apache.dolphinscheduler.plugin.task.hive;
        import org.apache.dolphinscheduler.spi.params.base.PluginParams;
        import org.apache.dolphinscheduler.spi.task.TaskChannel;
        import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
        import java.util.List;
        public class HiveClientTaskChannelFactory implements TaskChannelFactory {
        /**
        * Create task channel, execute task through this channel
        * @return task channel
        */
        @Override
        public TaskChannel create() {
        return new HiveCliTaskChannel();
        }
        /**
        * Returns the global unique identifier of this task
        * @return task name
        */
        @Override
        public String getName() {
        return "HIVECLI";
        }
        /**
        * Parameters required for front-end pages
        * @return
        */
        @Override
        public List getParams() {
        return null;
        }
        }

        4.4 创建 TaskChannel

        有了工厂之后,我们会根据工厂创建出 TaskChannel,TaskChannel 包含如下两个方法,一个是取消,一个是创建,目前不需要关注取消,主要关注创建任务。
         
             void cancelApplication(boolean status);
          /**
          * 构建可执行任务
          */
          AbstractTask createTask(TaskRequest taskRequest);
          public class HiveClientTaskChannel implements TaskChannel {
          @Override
          public void cancelApplication(boolean b) {
          //do nothing
          }
          @Override
          public AbstractTask createTask(TaskRequest taskRequest) {
          return new HiveClientTask(taskRequest);
          }
          }

          4.5 构建 Task 实现

          通过 TaskChannel 我们得到了可执行的物理 Task,但是我们需要给当前 Task 添加相应的实现,才能够让Apache DolphinScheduler 去执行你的任务,首先在编写 Task 之前我们需要先了解一下 Task 之间的关系:

          通过上图我们可以看到,基于 Yarn 执行任务的 Task 都会去继承 AbstractYarnTask,不需要经过 Yarn 执行的都会去直接继承 AbstractTaskExecutor,主要是包含一个 AppID,以及 CanalApplication setMainJar 之类的方法,想知道的小伙伴可以自己去深入研究一下,如上可知我们实现的 HiveClient 就需要继承 AbstractYarnTask,在构建 Task 之前,我们需要构建一下适配 HiveClient 的 Parameters 对象用来反序列化JsonParam。
            package com.jegger.dolphinscheduler.plugin.task.hive;
            import org.apache.dolphinscheduler.spi.task.AbstractParameters;
            import org.apache.dolphinscheduler.spi.task.ResourceInfo;
            import java.util.List;
            public class HiveClientParameters extends AbstractParameters {
            /**
            * 用HiveClient执行,最简单的方式就是将所有SQL全部贴进去即可,所以我们只需要一个SQL参数
            */
            private String sql;
            public String getSql() {
            return sql;
            }
            public void setSql(String sql) {
            this.sql = sql;
            }
            @Override
            public boolean checkParameters() {
            return sql != null;
            }
            @Override
            public List getResourceFilesList() {
            return null;
            }
            }
            实现了 Parameters 对象之后,我们具体实现 Task,例子中的实现比较简单,就是将用户的参数写入到文件中,通过 Hive -f 去执行任务。
              package org.apache.dolphinscheduler.plugin.task.hive;
              import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
              import org.apache.dolphinscheduler.spi.task.AbstractParameters;
              import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
              import org.apache.dolphinscheduler.spi.utils.JSONUtils;
              import java.io.BufferedWriter;
              import java.io.IOException;
              import java.nio.charset.StandardCharsets;
              import java.nio.file.Files;
              import java.nio.file.Path;
              import java.nio.file.Paths;
              public class HiveClientTask extends AbstractYarnTask {
              /**
              * hive client parameters
              */
              private HiveClientParameters hiveClientParameters;
              /**
              * taskExecutionContext
              */
              private final TaskRequest taskExecutionContext;
              public HiveClientTask(TaskRequest taskRequest) {
              super(taskRequest);
              this.taskExecutionContext = taskRequest;
              }
              /**
              * task init method
              */
              @Override
              public void init() {
              logger.info("hive client task param is {}", JSONUtils.toJsonString(taskExecutionContext));
              this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);
              if (this.hiveClientParameters != null && !hiveClientParameters.checkParameters()) {
              throw new RuntimeException("hive client task params is not valid");
              }
              }
              /**
              * build task execution command
              *
              * @return task execution command or null
              */
              @Override
              protected String buildCommand() {
              String filePath = getFilePath();
              if (writeExecutionContentToFile(filePath)) {
              return "hive -f " + filePath;
              }
              return null;
              }
              /**
              * get hive sql write path
              *
              * @return file write path
              */
              private String getFilePath() {
              return String.format("%s/hive-%s-%s.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());
              }
              @Override
              protected void setMainJarName() {
              //do nothing
              }
              /**
              * write hive sql to filepath
              *
              * @param filePath file path
              * @return write success?
              */
              private boolean writeExecutionContentToFile(String filePath) {
              Path path = Paths.get(filePath);
              try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
              writer.write(this.hiveClientParameters.getSql());
              logger.info("file:" + filePath + "write success.");
              return true;
              } catch (IOException e) {
              logger.error("file:" + filePath + "write failed.please path auth.");
              e.printStackTrace();
              return false;
              }
              }
              @Override
              public AbstractParameters getParameters() {
              return this.hiveClientParameters;
              }
              }
              4.6 遵守 SPI 规则

                # 1,Resource下创建META-INF/services文件夹,创建接口全类名相同的文件
                zhang@xiaozhang resources % tree ./
                ./
                └── META-INF
                └── services
                └── org.apache.dolphinscheduler.spi.task.TaskChannelFactory
                # 2,在文件中写入实现类的全限定类名
                zhang@xiaozhang resources % more META-INF/services/org.apache.dolphinscheduler.spi.task.TaskChannelFactory
                org.apache.dolphinscheduler.plugin.task.hive.HiveClientTaskChannelFactory

                4.7 打包和部署

                  ## 1,打包
                  mvn clean install
                  ## 2,部署
                  cp ./target/dolphinscheduler-task-hiveclient-1.0.jar $DOLPHINSCHEDULER_HOME/lib/
                  ## 3,restart dolphinscheduler server

                  以上操作完成后,我们查看 worker 日志 tail -200f $Apache DolphinScheduler_HOME/log/Apache DolphinScheduler-worker.log

                  Apache DolphinScheduler 的插件开发就到此完成~涉及到前端的修改可以参考:
                  Apache DolphinScheduler-ui/src/js/conf/home/pages/dag/_source/formModel/

                  • NOTICE:目前任务插件的前端还没有实现,因此你需要单独实现插件对应的前端页面。

                  TaskChannelFactory 继承自 PrioritySPI,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 getIdentify 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 IllegalArgumentException。

                  如果任务插件存在类冲突,你可以采用 Shade-Relocating Classes(https://maven.apache.org/plugins/maven-shade-plugin/)来解决这种问题。

                  如果您有兴趣试试 Apache DolphinScheduler ,欢迎微信添加小助手 Leonard-ds 加入 DolphinScheduler Slack: https://s.apache.org/dolphinscheduler-slack, 我将免费全力支持您!


                  参与贡献


                  随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


                  参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


                  贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


                  社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


                  非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


                  如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html


                  来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


                  参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

                  添加社区小助手微信(Leonard-ds,好友申请注明“入交流群+姓名+公司+职位信+是否是用户”,群里是实名制,仅用于验证身份) 



                  如果想参与贡献,添加小助手微信时请说明想参与贡献。


                  来吧,开源社区非常期待您的参与。

                  < 🐬🐬 >
                  更多精彩推荐

                  Apache DolphinScheduler 支持使用 OceanBase 作为元数据库啦!

                  实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警

                  又一轮优秀用户案例有奖征集,快来投稿!

                  ☞去年办了这么多场Meetup都没有你,2023年赶紧安排起来!

                  用户案例 | 蜀海供应链基于 Apache DolphinScheduler 的数据表血缘探索与跨大版本升级经验

                  3.2.0 版本预告!远程日志解决 Worker 故障获取不到日志的问题



                  点击阅读转发在看

                  文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论