暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

动态线程共享,SeaTunnel-TaskExecutionServer 的设计思路

SeaTunnel 2022-12-08
1546
点亮 ⭐️ Star · 照亮开源之路
https://github.com/apache/incubator-seatunnel


    

本文将介绍Apache SeaTunnel的TaskExecutionServer设计思路。


TaskExecutionServer是执行Task的具体服务,它会在每个节点上运行一个实例。


TaskExecutionServer不仅负责接收来自JobMaster下发的TaskGroup并运行其中的Task,还维护着TaskID->TaskContext的映射关系。


对Task的具体操作,则封装在TaskContext中。


Task内部持有OperationService,这意味着Task可以通过OperationService 对其他Task或者JobMaster等进行远程调用和通信。


01

TaskGroup


一个TaskGroup中的task都运行在同一个节点上。

一个优化点:
在同一个TaskGroup内部的Task之间的数据通道使用本地Queue,不同的TaskGroup之间因为可能会在不同节点上执行所以数据通道使用分布式Queue(hazalcast Ringbuffer)。

02

Task设计


Task最重要的一个方法是call()方法,执行器通过调用Task的call()方法去驱动Task的运行。call()方法会有一个返回值ProgressState,执行器通过ProgressState可以判断这个Task是否结束,以及是否需要继续调用call() 方法。

03

Thread Share优化


背景:在大量小任务同步的场景下,会产生大量的Task。若每个Task由一个线程负责会运行大量线程造成资源浪费,这时候如果一个线程可以运行多个Task会大大改善这种情况。但如何才能做到一个线程同时执行多个Task呢?

因为Task内部是通过一次次调用Call()方法进行驱动的,可以由一个线程轮流的去调用它所负责所有Task的Call()方法。


这样也会带来一个问题,就是一个task的call()方法执行时间非常长的话。这样就会一直占用这个线程,导致其他Task的延迟非常高。

针对这样的问题暂时想到以下两个优化方案:

1

Marking Thread Share


在Task上提供标识,标识这个Task是否支持线程共享(Thread Share)。在Task具体实现时去标识这个Task是否支持线程共享,可以共享的Task会去共享一个线程执行,不可以共享的会独享一个线程执行。

标识Task是否支持线程共享是由Task的具体实现者去评估,依据Call方法执行时间,若Call方法的执行实现都在ms级别,那么就可以标识这个Task为支持线程共享。

2

Dynamic Thread Share


上述方案一有一个根本的问题,就是往往Call方法的执行时间是不固定的,Task自己也不是很清楚自己Call()方法的调用时间,因为在不同的阶段、不同的数据量下等等都会影响Call方法的执行时间。这样的Task标识为支持共享线程和不支持其实都不是很合适,因为如果标识为可共享线程的情况下如果某次调用Call方法的执行时间非常长,这样就会导致其他共享当前线程的Task的延迟会非常高。若不支持共享那么依然没有解决资源浪费的问题。

所以可以让Task线程共享变成动态的,并且一组Task由一个线程池负责执行(Task数量 >> 线程数)。在thread1执行的过程中若Task1的call()方法执行时间超过了设定值(100ms),这时候就会从线程池取出一个线程thread2去执行下一个Task2的Call方法。保证不会因为Task1执行时间太长从而导致其他Task延迟太高。

Task2的call方法在超时时间内正常执行完成时,会将Task2放回任务队列尾,并且thread2继续从任务队列中拿出Task3执行Call方法。当Task1的call方法执行完成后,会将thread1讲放回线程池,并标记Task1超时一次。当某个TaskCall方法执行超时次数达到某个限制时会将该Task移除共享线程任务队列,独享一个线程。

Step1 :开始执行,会从线程池拿一个线程,开始执行任务队列中的任务

Step2 : 若Task1执行时间超过设定值,会再拿一个线程出来执行task2保证其他Task延迟不会受Task1影响

Step3 : 若Task2正常执行完成,将会被放到队尾,等待下次执行。同时Thread2会从队列中再拿出一个TasK执行

Step4 : 这时候如果Task1执行完成,Task1会被放在队尾,Thread1也会被放回线程池

Step5 : 如果Task1多次执行超时,那么它会被移除任务队列,独享一个线程执行

在目前的SeaTunnel Engine 中实现了方案二,后续还将继续优化,希望给大家带来更好的使用体验。



Apache SeaTunnel


Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/incubator-seatunnel

网址:
https://seatunnel.apache.org/

Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

Apache SeaTunnel(Incubating)  下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/incubator-seatunnel/issues

贡献代码:
https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

订阅用户交流邮件列表:
users-subscribe@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

精彩推荐




Apache SeaTunnel 获评「2022 年度优秀开源技术团队」




社区又迎来一位 Commiter!




比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布



点击“阅读原文”参与共建!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论