暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HTTP接口数据也能定时同步入湖?DolphinScheduler×SeaTunnel快速搞定!

海豚调度 2025-05-08
357

点击蓝字,关注我们

1

背景与目标


我们之前曾评估使用过SeaTunnel做CDC入湖验证:SeaTunnel-CDC入湖实践,这些场景都是能直连数据库的场景,业务需求中经常会出现无法直连数据库做CDC进行数据同步的场景,而这些场景就需要使用API进行数据对接,用Apache DolphinScheduler定时同步数据。

举个实际中的例子:

  • ERP(SAP)的库存数据进行同步入湖仓做库存分析

同时,本次目标希望其他同事能依样画葫芦,在以后的对接http接口到湖仓的时候能够自行完成,而非每遇到一个对接需求,就需要通过代码方式进行对接。


2

准备工作

  • seatunnel 2.3.10

首先,您需要在${SEATUNNEL_HOME}/config/plugin_config
文件中加入连接器名称,然后,执行命令来安装连接器,确认连接器在${SEATUNNEL_HOME}/connectors/目录下即可。

本例中我们会用到:connector-jdbc
connector-paimon

写入StarRocks也可以使用connector-starrocks
,本例中的场景比较适合用connector-jdbc
,所以使用connector-jdbc

# 配置连接器名称
--connectors-v2--
connector-jdbc
connector-starrocks
connector-paimon
--end--

# 安装连接器
sh bin/install-plugin.sh 2.3.10


3

SeaTunnel任务

我们先至少保证能在本地完成SeaTunnel任务,再完成对Apache DolphinScheduler的对接。

  • http to starRocks
    example/http2starrocks
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Http {
    plugin_output = "stock"
    url = "https://ip/http/prd/query_sap_stock"
    method = "POST"
    headers {
      Authorization = "Basic XXX"
      Content-Type = "application/json"
    }
    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
    format = "json"
    content_field = "$.ET_RETURN.*"
    schema {
      fields {
        MATNR = "string"
        MAKTX = "string"
        WERKS = "string"
        NAME1 = "string"
        LGORT = "string"
        LGOBE = "string"
        CHARG = "string"
        MEINS = "string"
        LABST = "double"
        UMLME = "double"
        INSME = "double"
        EINME = "double"
        SPEME = "double"
        RETME = "double"
      }
    }
  }
}

# 此转换操作主要用于字段从命名等方便用途
transform {
  Sql {
    plugin_input = "stock"
    plugin_output = "stock-tf-out"
    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
  }
}

# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写
sink {
    jdbc {
        plugin_input = "stock-tf-out"
        url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "lab"
        password = "XXX"
        compatible_mode="starrocks"
        query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
        }
}

# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景
// sink {
//   StarRocks {
//     plugin_input = "stock-tf-out"
//     nodeUrls = ["ip:8030"]
//     base-url = "jdbc:mysql://ip:9030/"
//     username = "lab"
//     password = "XXX"
//     database = "scm"
//     table = "ods_sap_stock"
//     batch_max_rows = 1000
//     data_save_mode="DROP_DATA"
//     starrocks.config = {
//       format = "JSON"
//       strip_outer_array = true
//     }
//     schema_save_mode = "RECREATE_SCHEMA"
//     save_mode_create_template="""
//       CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (
//         MATNR STRING COMMENT '物料',
//         WERKS STRING COMMENT '工厂',
//         LGORT STRING COMMENT '库存地点',
//         MAKTX STRING COMMENT '物料描述',
//         NAME1 STRING COMMENT '工厂名称',
//         LGOBE STRING COMMENT '地点描述',
//         CHARG STRING COMMENT '批次编号',
//         MEINS STRING COMMENT '单位',
//         LABST DOUBLE COMMENT '非限制使用库存',
//         UMLME DOUBLE COMMENT '在途库存',
//         INSME DOUBLE COMMENT '质检库存',
//         EINME DOUBLE COMMENT '受限制使用的库存',
//         SPEME DOUBLE COMMENT '已冻结的库存',
//         RETME DOUBLE COMMENT '退货'
//       ) ENGINE=OLAP
//       PRIMARY KEY ( MATNR,WERKS,LGORT)
//       COMMENT 'sap库存'
//       DISTRIBUTED BY HASH (WERKS) PROPERTIES (
//       "replication_num" = "1"
//       )
//     """
//   }
// }

  • http to paimon
    example/http2paimon
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Http {
    plugin_output = "stock"
    url = "https://ip/http/prd/query_sap_stock"
    method = "POST"
    headers {
      Authorization = "Basic XXX"
      Content-Type = "application/json"
    }
    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
    format = "json"
    content_field = "$.ET_RETURN.*"
    schema {
      fields {
        MATNR = "string"
        MAKTX = "string"
        WERKS = "string"
        NAME1 = "string"
        LGORT = "string"
        LGOBE = "string"
        CHARG = "string"
        MEINS = "string"
        LABST = "double"
        UMLME = "double"
        INSME = "double"
        EINME = "double"
        SPEME = "double"
        RETME = "double"
      }
    }
  }
}
# 此转换操作主要用于字段从命名等方便用途
transform {
  Sql {
    plugin_input = "stock"
    plugin_output = "stock-tf-out"
    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
  }
}

# 连接paimon进行数据同步,paimon 暂时 未看到有支持 insert overwrite 分区覆写,此例仅作为参考,不适用本此例子需求
sink {
  Paimon {
    warehouse = "s3a://test/"
    database = "sap"
    table = "ods_sap_stock"
    paimon.hadoop.conf = {
        fs.s3a.access-key=XXX
        fs.s3a.secret-key=XXX
        fs.s3a.endpoint="http://minio:9000"
        fs.s3a.path.style.access=true
        fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    }
  }
}



4

DolphinScheduler集成

SeaTunnel

  • 制作worker镜像
FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker:3.2.2
RUN mkdir opt/seatunnel
RUN mkdir opt/seatunnel/apache-seatunnel-2.3.10
# 容器集成seatunnel
COPY apache-seatunnel-2.3.10/ opt/seatunnel/apache-seatunnel-2.3.10/

打包镜像,推送到镜像仓库

docker build --platform=linux/amd64 -t apache/dolphinscheduler-worker:3.2.2-seatunnel .

  • 使用新镜像部署一个worker,此处修改docker-compose.yaml
    ,增加一个dolphinscheduler-worker-seatunnel
    节点。
...
  dolphinscheduler-worker-seatunnel:
    image: xxx/dolphinscheduler-worker:3.2.2-seatunnel
    profiles: ["all"]
    env_file: .env
    healthcheck:
      test: [ "CMD", "curl", "http://localhost:1235/actuator/health" ]
      interval: 30s
      timeout: 5s
      retries: 3
    depends_on:
      dolphinscheduler-zookeeper:
        condition: service_healthy
    volumes:
      - ./dolphinscheduler-worker-seatunnel-data:/tmp/dolphinscheduler
      - ./dolphinscheduler-logs:/opt/dolphinscheduler/logs
      - ./dolphinscheduler-shared-local:/opt/soft
      - ./dolphinscheduler-resource-local:/dolphinscheduler
    networks:
      dolphinscheduler:
        ipv4_address: 172.15.0.18
...

  • DolphinScheduler配置SeaTunnel分组及环境配置
    • 安全中心-Worker分组管理,创建一个这个节点ip的分组,用于以后需要seatunnel的任务跑该分组

    • 环境管理-创建环境,增加一个用于执行seatunnel的环境,同时需要绑定Worker分组为上一步创建的seatunnel分组

    • 创建工作流定义,把上面的seatunnel任务配置填写上

    • 运行时候,选择seatunnel的worker分组和环境即可跑在这个集成了seatunnel的环境上

转载自俊瑶先森
原文链接:https://junyao.tech/posts/9c6867c5.html





用户案例



网易邮箱 每日互动 惠生工程  作业帮 
博世智驾 蔚来汽车 长城汽车集度长安汽车
思科网讯食行生鲜联通医疗联想
新网银行唯品富邦消费金融 
自如有赞伊利当贝大数据
珍岛集团传智教育Bigo
YY直播  三合一太美医疗
Cisco Webex兴业证券




迁移实战



Azkaban   Ooize(当贝迁移案例)
Airflow (有赞迁移案例)
Air2phin(迁移工具)
Airflow迁移实践



发版消息




Apache DolphinScheduler 3.2.2版本正式发布!
Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
Apache DolphinScheduler 3.3.0 Alpha发布,功能增强与性能优化大升级!




加入社区



关注社区的方式有很多:

  • GitHub: https://github.com/apache/dolphinscheduler
  • 官网:https://dolphinscheduler.apache.org/en-us
  • 订阅开发者邮件:dev@dolphinscheduler@apache.org
  • X.com:@DolphinSchedule
  • YouTube:https://www.youtube.com/@apachedolphinscheduler
  • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

📂非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

👩‍💻代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler


你的好友秀秀子拍了拍你

并请你帮她点一下“分享”

文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论