暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

干货教程 | DolphinScheduler 中的函数使用与扩展

海豚调度 2023-02-09
1178




点击蓝字 关注我们




作者 | 王继鹏 Apache DolphinScheduler Committer

Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

我们在使用Apache DolphinScheduler时通常会在某个逻辑中使用一些有规律的时间来指定业务时间或者执行时间,并且作用在单个任务上或者整个工作流上。

例如在工作流中设置一个全局参数用于输出上个月的今天,这时我们会用到函数:
    $[add_months(yyyyMMdd,-1)]

    在任务组件输出后我们得到这样的结果:

    很明显,在使用add_months函数之后,时间根据参数中的值进行了计算并且按照指定的时间格式输出。那如果我们想使用下个月、上一年或者下一周这种范围的时间,Apache DolphinScheduler官网对这块进行了描述:

    使用add_months()函数,该函数用于加减月份, 第一个入口参数为[yyyyMMdd],表示返回时间的格式 第二个入口参数为月份偏移量,表示加减多少个月
    • 后 N 年:$[add_months(yyyyMMdd,12*N)]
    • 前 N 年:$[add_months(yyyyMMdd,-12*N)]
    • 后 N 月:$[add_months(yyyyMMdd,N)]
    • 前 N 月:$[add_months(yyyyMMdd,-N)]
    直接加减数字 在自定义格式后直接“+/-”数字
    • 后 N 周:$[yyyyMMdd+7*N]
    • 前 N 周:$[yyyyMMdd-7*N]
    • 后 N 天:$[yyyyMMdd+N]
    • 前 N 天:$[yyyyMMdd-N]
    • 后 N 小时:$[HHmmss+N/24]
    • 前 N 小时:$[HHmmss-N/24]
    • 后 N 分钟:$[HHmmss+N/24/60]
    • 前 N 分钟:$[HHmmss-N/24/60]

    通过以上函数我们可以在Apache DolphinScheduler中顺利地完成我们的业务。但是如果Apache DolphinScheduler提供的函数已经不能支持现有的业务,那我们应该怎么做?

    例如获取上个季度第一个月的最后五天,最终输出的结果应该是20220727、20220728、20220729、20220730、20220731。如果再复杂一些,获取上个季度第一个月的最后五个工作日,那么最终输出的结果应该是
    20220725、20220726、20220727、20220728、20220729,因为20220730和20220731是非工作日。

    这个场景可以得到的线索是:时间范围、时间偏移量、结果时间范围和是否为工作日,Apache DolphinScheduler目前只支持自然日的时间函数,所以我们要在Apache DolphinScheduler中扩展函数计算使它可以支持复杂的业务场景。

    幸运的是Apache DolphinScheduler对外提供了这个扩展点:
    位置在dolphinscheduler-service模块中,src/main/java/org/apache/dolphinscheduler/service/expand路径下,timeFunctionNeedExpand方法用于判断当前函数是否需要在扩展的函数中计算,timeFunctionExtension方法用于对扩展的函数表达式做最终的计算并返回计算结果。
      时间函数扩展
      public interface TimePlaceholderResolverExpandService {
      boolean timeFunctionNeedExpand(String placeholderName);
      String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName);
      }

      在同级目录下对时间函数扩展接口做了默认的实现,默认不需要做函数扩展计算和扩展计算返回NULL。
        时间函数默认扩展实现
        @Component
        public class TimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService {
        @Override
        public boolean timeFunctionNeedExpand(String placeholderName) {
        return false;
        }
        @Override
        public String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName) {
        return null;
        }
        }

        看到这里,上面复杂业务场景中的时间计算实现已经完成了一半,另一半就是对扩展方法做一个自定义的实现:
          自定义函数实现
          public class CustomerTimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService {
          @Override
          public boolean timeFunctionNeedExpand(String placeholderName) {
          // Increase the identification of extension functions
          return true;
          }
          @Override
          public String timeFunctionExtension(FunctionExpandContent functionExpandContent) {
          try {
          // Add the analysis and calculation logic of user-defined functions
          } catch (Exception e) {
          log.error("time function extension error{}", functionExpandContent, e);
          }
          return null;
          }
          }
          最后,我们通过实现自定义函数计算逻辑满足了业务场景的需要,但是有一点我要提醒的是,因为业务的复杂与不同可能需要更多的参数,这个时候再去维护历史的函数计算未免太过麻烦。

          建议大家在使用时将所需参数优化为一个聚合的对象,无论后续业务如何变化,只需在相应的场景做字段增加即可,代码如下:
            时间函数扩展优化
            public interface TimePlaceholderResolverExpandService {
            boolean timeFunctionNeedExpand(String placeholderName);
            String timeFunctionExtension(FunctionExpandContent functionExpandContent);
            }
            public class FunctionExpandContent {
            private boolean global;
            private String parameters;
            private Integer processInstanceId;
            private String timezone;
            private String placeholderName;
            private MapparamsMap;
            }

            参与贡献


            随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


            参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


            贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


            社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


            非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


            如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html


            来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


            参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

            添加社区小助手微信(Leonard-ds) 



            添加小助手微信时请说明想参与贡献。


            来吧,开源社区非常期待您的参与。



            < 🐬🐬 >
            更多精彩推荐

            在 AWS 上部署无服务器 Apache DolphinScheduler 任务调度系统

            优秀用户案例有奖征集 | 活动火热开启,快来投稿!

            DolphinScheduler 快速构建 Hugging Face 文本分类工作流,基于工作流的机器学习训练部署太强了!

            最新性能测试 | Apache DolphinScheduler 每分钟调度任务并发是 Apache Airflow 2 倍

            名额已排到10月 | Apache DolphinScheduler Meetup分享嘉宾继续火热招募中

            非代码的贡献也能成为Committer,我与DolphinScheduler社区的故事

            分布式可视化作业调度平台 DolphinScheduler MasterServer 设计核心要点揭秘



            我知道你在看

            文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论