暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

解析 Apache SeaTunnel 的任务运行过程

SeaTunnel 2024-11-26
401


示例代码运行

感兴趣的朋友可以先看看这篇文章《记本地第一次运行SeaTunnel示例项目》:https://blog.csdn.net/u011924665/article/details/143373017

基于分支

2.3.5-Release

跟踪过程

入口

Apache SeaTunnel提供的官方Example基于SeaTunnel Engine
引擎的示例作为入口:org.apache.seatunnel.example.engine.SeaTunnelEngineExample

构建SeaTunnel引擎的命令行运行对象

这里默认会使用seatunnel-engine-example
子项目的resources
下的example/fanke\_to\_console.conf
这个任务配置文件,并且以本地方式运行。

首先解析任务配置文件,构建ClientCommandArgs
对象。

    public static void main(String[] args)
            throws FileNotFoundException, URISyntaxException, CommandException {
        String configurePath = args.length > 0 ? args[0] : "/examples/fake_to_console.conf";
        String configFile = getTestConfigFile(configurePath);
        ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
        clientCommandArgs.setConfigFile(configFile);
        clientCommandArgs.setCheckConfig(false);
        clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
        // Change Execution Mode to CLUSTER to use client mode, before do this, you should start
        // SeaTunnelEngineServerExample
        clientCommandArgs.setMasterType(MasterType.LOCAL);
        SeaTunnel.run(clientCommandArgs.buildCommand());
    }

最后一行中,ClientCommandArgs
buildCommand()
方法会根据任务配置文件构建出一个Command对象。

默认配置文件构建出来的就是一个ClientExecuteCommand(org.apache.seatunnel.core.starter.seatunnel.command包下)
对象,简而言之,这就是一个通过命令行执行的SeaTunnel引擎对象。

ClientExecuteCommand开始运行job

seatunnel-engine-example
示例代码中最后的SeaTunnel.run()
方法就是调用ClientExecuteCommand
execute()
方法开始,然后通过命令行的方式运行SeaTunnel的Job。

推荐以下步骤,对比源码一起阅读。

声明Job监控器对象的引用

JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;

整个Execute
的核心内容都被包裹在一个try-catch-finally
代码块中,此处声明一个Job监控器是为了在后续的Finally代码块中获取到任务执行结束(无论是成功还是失败)后的统计信息。

获取SeaTunnel的配置

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();

seatunnel-engine-example
中获取的是SeaTunnel任务配置,而在此处获取的是SeaTunnel的配置,生成配置的对象。

获取Haselcast的客户端的配置

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();

与加载SeaTunnel的配置类似,加载Hazelcast
Client
配置,生成配置对象。

本地方式运行

clusterName = creatRandomClusterName(
             StringUtils.isNotEmpty(clusterName)
                 ? clusterName : Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);

本地方式运行且没有设置集群名称,则会默认以SeaTunnel开头后面添加随机数,生成一个集群名称。

instance = createServerInLocal(clusterName, seaTunnelConfig);

然后创建一个本地的Hazelcast
客户端,创建成功之后,记录下来Hazelcast
的端口号,用于在执行SeaTunnel的Job时使用。

确认(记录)集群名称

将本次Job执行的集群名称记录到SeaTunnel和SeaTunnel Client
的配置对象中。

创建SeaTunnel Engine客户端

engineClient = new SeaTunnelClient(clientConfig);

创建一个SeaTunnel Engine的客户端,基于Hazelcast创建的。

创建出Engine Client对象后就是一列的判断。看起来是通过启动命令中指定来实现了,推测与任务恢复、状态等信息查询有关,暂未深入探究,感兴趣的小伙伴🙏可以研究研究。

准备执行Job

Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();

获取Job的配置文件,并对文件进行检查,然后创建Job配置对象。

jobExecutionEnv = engineClient
 .createExecutionContext(configFile.toString(), jobConfig, seaTunnelConfig);

使用Job配置对象,基于通过前面创建的SeaTunnel Engine
客户端创建一个Job运行环境对象jobExecutionEnv

开始执行

ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

通过调用jobExecutionEnv
execute()
方法开始执行Jb,并返回一个Job代理对象:ClientJobProxy。

execute()
方法的跟踪见:记第一次跟踪seatunnel的任务运行过程二——ClientJobExecutionEnvironment的execture方法

然后检查一下当前Job运行是否为异步模式,若为异步模式且不是本地模式运行,则流程结束。

若为非异步模式或任务是本地方式(seatunnel-engine-example中默认的是本地模式)执行,则继续执行以下流程。

注册任务取消的钩子函数

Runtime.getRuntime().addShutdownHook(
   new Thread(() -> { CompletableFuture<Void> future =
      CompletableFuture.runAsync(() -> {
       log.info("run shutdown hook because get close signal");
       shutdownHook(clientJobProxy);
   });
      try {
         future.get(15, TimeUnit.SECONDS);
      } catch (Exception e) {
         log.error("Cancel job failed.", e);
      }
   })
);

打印监控信息

启动一个新的定时线程,用于打印监控信息。

JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService = Executors.newSingleThreadScheduledExecutor(
                                new ThreadFactoryBuilder()
                                        .setNameFormat("job-metrics-runner-%d")
                                        .setDaemon(true)
                                        .build());
executorService.scheduleAtFixedRate(
  jobMetricsRunner,
  0,
  seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
  TimeUnit.SECONDS
);

这个线程打印的就是如下的监控信息:

 ***********************************************
            Job Progress Information
 ***********************************************
 Job Id                    :  904173403390000097
 Read Count So Far         :               23412
 Write Count So Far        :               23412
 Average Read Count        :              1233/s
 Average Write Count       :              1233/s
 Last Statistic Time       : 2024-10-31 19:04:02
 Current Statistic Time    : 2024-10-31 19:05:02
 ***********************************************

等到Job执行结果

线程在此同步的等待Job的执行结果。

结尾

如果Job的结果的错误信息不为空或者Job的结果为失败,则抛出异常。

正常结束则打印出Job的统计信息,形式如下:

 ***********************************************
            Job Statistic Information
 ***********************************************
 Start Time                : 2024-10-31 19:02:18
 End Time                  : 2024-10-31 19:05:32
 Total Time(s)             :                 194
 Total Read Count          :              239202
 Total Write Count         :              239202
 Total Failed Count        :                   0
 ***********************************************







活动推荐

11月30日下午2点,社区邀请了“康源研究院”的康源研究院大数据架构师!一起跟大家聊聊他们是如何调研使用SeaTunnel,踩过哪些坑?如何从DataX迁移到SeaTunnel?








同步Demo

 MySQL→Doris
MySQLCDC
MySQL→Hive
 HTTP → Doris 
HTTP → MySQL

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
基于Apache SeaTunnel构建CDC数据同步管道
【保姆级教程】使用SeaTunnel同步Kafka的数据到ClickHouse
【数据同步】SeaTunnel初体验,5000字深入浅出带你用上Oracle-CDC

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会
众安保险 兆原数通 亚信科技
映客 翼康济世 信也科技
华润置地

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

源码解析


Apache SeaTunnel Zeta引擎源码解析(一) Server端的初始化
Apache SeaTunnel Zeta引擎源码解析(二) Client端的任务提交流程
Apache SeaTunnel Zeta引擎源码解析(三) Server端接收任务的执行流程
全面解析 SeaTunnel API 源码:从入门到精通数据集成
从启动到关闭 | SeaTu源码解析nnel2.1.1源码解析
SeaTunnel 2.1.2 封装 Flink 连接数据库的源码解析
那些年,我们在Apache SeaTunnel 2.1.0部署中踩过的坑【含源码分析】


Apache SeaTunnel



Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台
仓库地址: 
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
关注 Twitter: 
https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论