点击蓝字 关注我们

开源消息
近日,调度系统迁移工具 Air2phin 宣布开源。借助 Air2phin,用户可 2 步将调度系统从 Airflow 迁移至 Apache DolphinScheduler,为有调度系统迁移需要的用户带来极大便利。
Air2phin是什么?
注释
AST 是 Abstract Syntax Tree(抽象语法树)的缩写,它是一种以树状结构表示代码语法结构的数据结构。在编译器中,AST 是由词法分析器和语法分析器生成的。词法分析器将源代码转换成标记流(token stream),语法分析器将标记流转换成抽象语法树。AST 是一种树状结构,它由一系列节点组成,每个节点表示代码中的一个语法结构(如表达式、语句、函数、类等),节点之间的关系表示语法结构之间的嵌套关系。
为什么开源Air2phin?
为什么要从Airflow 迁移至DolphinScheduler?
01
什么是工作流编排系统?
02
Airflow是什么?


Air2phin 如何安装和使用
python -m pip install --upgrade air2phin
一个简单的例子


air2phin migrate --inplace tutorial_part.py
将 airflow.DAG 转换成 pydolphinscheduler.core.process_definition.ProcessDefinition,这个规则在第三行(import语句)以及第六行 DAG context 将 airflow.operators.bash.BashOperator 转换成 pydolphinscheduler.tasks.shell.Shell,这个规则在任务 t1,t2 中都被使用 除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow.DAG.schedule_interval 转换成了 ProcessDefinition.schedule,同时修改了部分值的内容,如将 timedelta(days=1) 转成 '0 0 0 * * ? *'
# 安装 apache-dolphinschedulerpython -m pip install apache-dolphinscheduler# 将工作流提交到 dolphinschedulerpython tutorial_part.py
工作原理
Airflow 如何工作:Airflow 工作流相关的信息都保存在 DAG 文件中,之后将 DAG 文件放置到 Airflow 的指定目录,Airflow 的 Scheduler 会间隔一定时间去扫描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被动被扫描和更新的。

dolphinscheduler python sdk: 同 Airflow 类似,将全部工作流相关的信息都通过 Python 文件定义,但是 dolphinscheduler python sdk 是通过人为主动触发的方式,将工作流信息提交,运行命令 python 工作流文件名 即可完成主动任务提交。

Air2phin 工作流程

从标准输入或者文件中获取原本的 Airflow DAG 内容 从 Yaml 文件加载所有转换规则 将 Airflow DAG 内容通过 LibCST 解析成 CST 树 通过 LibCST Transformer 转换 dolphinscheduler python sdk 定义内容
Air2phin 最佳实践
01
迁移整个文件夹而不是单个文件
# 迁移整个 ~/airflow/dags 文件夹air2phin migrate --inplace ~/airflow/dags
02
增加自定义的规则
from airflow.providers.postgres.operators.postgres import PostgresOperatorclass MyCustomOperator(PostgresOperator):def __init__(self,*,sql: str | Iterable[str],my_custom_conn_id: str = 'postgres_default',autocommit: bool = False,parameters: Iterable | Mapping | None = None,database: str | None = None,runtime_parameters: Mapping | None = None,**kwargs,) -> None:super().__init__(sql=sql,postgres_conn_id=my_custom_conn_id,autocommit=autocommit,parameters=parameters,database=database,runtime_parameters=runtime_parameters,**kwargs,)
from custom.my_custom_operator import MyCustomOperatorwith DAG(dag_id='my_custom_dag',default_args=default_args,schedule_interval='@once',start_date=days_ago(2),tags=['example'],) as dag:t1 = MyCustomOperator(task_id='my_custom_task',sql='select * from table',my_custom_conn_id='my_custom_conn_id',)
name: MyCustomOperatordescription: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration:module:- action: replacesrc: custom.my_custom_operator.MyCustomOperatordest: pydolphinscheduler.tasks.sql.Sqlparameter:- action: replacesrc: task_iddest: name- action: replacesrc: my_custom_conn_iddest: datasource_name
# 指定自定义规则路径为 path/to/MyCustomOperator.yamlair2phin migrate --inplace --custom-rules path/to/MyCustomOperator.yaml ~/airflow/dags
03
让 Air2phin 运行地更快
# 指定 air2phin 启动 12 个进程同时进行转化air2phin migrate --inplace --custom-rules path/to/MyCustomOperator.yaml --multiprocess 12 ~/airflow/dags
存在的问题
airflow.DAG airflow.operators.bash.BashOperator airflow.operators.dummy_operator.DummyOperator airflow.operators.python_operator.PythonOperator airflow.operators.spark_sql_operator.SparkSqlOperator
Air2phin 常见问题解答
参与贡献
随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。

参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。
社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689
非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22
如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html
来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。
参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

添加社区小助手微信(Leonard-ds)
添加小助手微信时请说明想参与贡献。
来吧,开源社区非常期待您的参与。
☞干货教程 | DolphinScheduler 中的函数使用与扩展
☞最新性能测试 | Apache DolphinScheduler 每分钟调度任务并发是 Apache Airflow 2 倍





