暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DolphinScheduler接口实操(一):利用接口实现高效批量工作流导入及脚本上线

海豚调度 2025-01-24
697

点击蓝字,关注我们

转载自风_间
实现了批量生成DolphinScheduler的任务,当导入时发现只能逐个导入,因此通过接口实现会更方便。


1



DolphinSchedululer接口文档


DolphinScheduler的接口文档地址:http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn
token:所有的接口都需要用到token

在安全中心-令牌管理 创建一个token 。记住这个token,后面所有的接口都需要用到 。
header:根据上面的token组成请求要用的header
    token = ''
    headers = {
    'Accept': 'application/json',
    'token': token
    }
    项目ID project_id 可以在查看项目工作流时,在url中找到。



    2



    导入任务接口


    导入任务的接口是
      import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import'
      知道接口 就可以导入了
        def import_job(file_path):
        # 打开文件并读取为二进制数据
        with open(file_path, 'rb') as file:
        files = {'file': file}
        # 导入工作流
        response = requests.post(import_url, headers=headers, files=files)
        print(response.status_code)
        if response.status_code != 200:
        print('上传失败 '+file_path)
        需要注意的是,导入任务时 只支持二进制 。
        file_path 是工作流文件,具体实现 可以工作流中导出一个作为参考。
        重复使用上述方法,就可以实现批量导入任务。



        3



        工作流上线


        使用上述方法批量完成任务上传后,依旧有问题,逐个上线工作量也是个不小的工作量,因此继续使用接口。
        经过研究发现,上线工作流需要先获取工作流的调度ID 。
        获取工作流列表 - > 获取工作流code -> 获取所有工作流的调度ID -> 工作流上线
        获取工作流列表
        这是接口地址:
          jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition'
          不过这个要分页查询,稍微有一点点麻烦。
            def get_jobs_list():
            # 分页查询
            # 初始化分页参数
            pageNo = 1
            pageSize = 10
            url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal='
            # 构建完整的URL
            # 存储所有结果
            all_items = list()
            while True:
            # 构建完整的URL
            url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal='




            # 发送GET请求
            response = requests.get(url, headers=headers)




            # 检查响应状态码
            if response.status_code == 200:
            # 请求成功,处理响应数据
            items = response.content.decode()
            total = json.loads(items)["data"]["total"]
            item = json.loads(items)["data"]["totalList"]
            # 将当前页的数据添加到结果列表中
            for i in item:
            all_items.append(i)




            # 如果当前页没有数据,退出循环
            if pageNo * pageSize > total:
            break
            if not items:
            break
            # 增加页码
            pageNo += 1
            else:
            # 请求失败,打印错误信息
            print('请求失败:', response.status_code, response.text)
            break




            return all_items


            all_items 是所有工作流的具体内容,需要提取一下:
              all_jobs = get_jobs_list()
              job_codes = [job['code'] for job in all_jobs]
              这样就是所有的工作流code。
              获取调度ID:
              下面是调度ID的接口,因为不想分页,直接一页1000个。
                schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode='
                使用这个接口就能拿到所有的调度ID
                  def schedule_id(job_code):
                  url = schedules_url+str(job_code)
                  response = requests.get(url, headers=headers)
                  if response.status_code == 200:
                  data = response.content.decode()
                  js = json.loads(data)
                  if len(js['data']['totalList'])>0 and js['data']['totalList'][0]['releaseState']=='OFFLINE':
                  return js['data']['totalList'][0]['id']
                  else:return ''
                  这里过滤了已经上线的调度ID 。
                  上线:
                  万事俱备 终于可以上线了。
                    online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online'
                    具体实现:
                      def online_job(scheduler_id):
                      url = online_url.format(scheduler_id=scheduler_id)
                      response = requests.post(url, headers=headers)
                      if response.status_code == 200:
                      print('success')
                      else:
                      print('online job failed')


                      到此 就可以实现导入-批量全自动了。
                      打完收工,祝你不加班。                    
                      原文链接:https://blog.csdn.net/weixin_45399602/article/details/143226396
                      <🐬🐬 >

                      推荐阅读

                      用户实践案例
                      奇富科技  腾讯音乐 联通数科 拈花云科
                      蔚来汽车 长城汽车 集度 长安汽车
                      思科网讯 食行生鲜 联通医疗 联想
                      新网银行 唯品富邦消费金融  蜀海供应链 
                      自如 有赞 伊利 当贝大数据
                      珍岛集团 传智教育 Bigo
                      YY直播  作业帮 太美医疗
                      某新能源 中电信翼康
                      迁移实践
                      Azkaban   Ooize(当贝迁移案例)   
                      Airflow (有赞迁移案例) 
                      Air2phin(迁移工具)
                      Airflow迁移实践

                      新手入门
                      选择Apache DolphinScheduler的10个理由
                      Apache DolphinScheduler 3.1.8 保姆级教程【安装、介绍、项目运用、邮箱预警设置】轻松拿捏!
                      Apache DolphinScheduler 如何实现自动化打包+单机/集群部署?
                      DolphinScheduler快速上手:基于Docker Compose的安装与配置全攻略
                      Apache DolphinScheduler 在大数据环境中的应用与调优
                      Apache DolphinScheduler-3.2.0集群部署教程

                      < 🐬🐬 >
                      参与社区

                      参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,包括:


                      贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

                      社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689

                      非新手问题列表:https://github.com/apache/dolphinscheduler/issues?
                      q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

                      如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html

                      来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的!


                      球分享

                      球点赞

                      球在看

                      文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论