TaskFlow 是 OpenStack 项目下的一个 python 公共库,给 OpenStack 和其他的 python 项目提供一个简单、统一、可靠、和可扩展的任务执行框架。
它能够跟踪和控制应用程序中复杂逻辑任务的执行、暂停、重启、恢复以及回滚,保证任务执行的可靠性和一致性,同时提供了计量、执行记录、进度和状态追踪的能力,给 OpenStack 的长流程任务和复杂状态转换过程提供了一个统一的解决方案,目前包括 Cinder、Glance、Neutron、Octavia 等项目都使用了 TaskFlow 来管理各自复杂的业务流程,比如 create volume、import image 等。
create volume 流程

设计架构
TaskFlow 本身的实现比较复杂,它把预先定义好的一系列小 Task 按一定顺序和规则组合成一个或多个 Workflow,经过 Engine 载入后同步执行或调度给执行节点异步执行,同时提供执行过程和任务状态的追踪能力,可以根据预定义策略对异常任务进行回滚或重试处理。

TaskFlow 分为提供同步执行能力的基础部分(上图左半部分)和依赖外部支撑提供异步运行的可选部分(右半部分),目前 OpenStack 的组件主要用的还是基础部分的功能。
基本概念
Atom:原子,TaskFlow 里的最小执行单位,抽象基类,派生出Task、Retry等不同的任务类型。
Task:任务,定义了执行 execute() 和回滚 revert() 操作的工作单元,所有开发者自定义的 task 都必须继承 taskflow.task.Task 类。例子:

Flow:任务流,以一定的次序组织和运行 Task,失败时触发关联的 Retry 进行回滚。Flow 可以包含 Task,也可以嵌套其他 Flow,有三种类型:
Linear(linear_flow.Flow):线性流,task/flow 按定义的顺序依次执行,按倒序依次回滚
Unordered(unordered_flow.Flow):无序流,task/flow 可能按照任意的顺序执行和回滚
Graph(graph_flow.Flow):图流,task/flow 按定义时的 provides/requires 参数隐含依赖关系或者显式添加的 link 关系来执行和回滚
Retry:重试,和 Task 一样继承自 Atom,也有 execute() 和 revert() 方法,多了一个 on_failure(history, *args, **kwargs) 方法,在 TaskFlow 执行或回滚发生错误时,使用前面执行异常的 history 信息返回重试策略,TaskFlow 根据重试策略执行对应的异常处理。
Retry 重试策略有三种:
REVERT:仅回滚 Retry 关联的 Subflow
REVERT_ALL:回滚整个 TaskFlow 流程
RETRY:重试关联的 Subflow
TaskFlow 同时提供了几个实现好的 Retry 派生类,开发者也可以直接使用这些类来处理异常:
AlwaysRevert 类:有异常发生时,回滚所在 Subflow
AlwaysRevertAll 类:异常发生时,回滚整个 TaskFlow
Times 类:异常发生时,重试 Subflow 指定次数
ForEach 类:异常发生时,使用一个预定义的参数列表重跑 Subflow,直到成功或定义的值用完为止
ParameterizedForEach 类:异常发生时,根据动态传入的参数列表重试 Subflow,直到成功或值用完为止
Engine:执行引擎,真正负责管理和执行 Flow/Task/Retry 的组件,有三种类型:
Serial:单线程,所有的 Task 都在 engine.run 的线程中运行
Parallel:调度没有依赖关系的 Task 到多个线程同步运行
Workers:调度 Task 到不同的 woker 中异步运行
工作流程
TaskFlow 流程可以分为几个阶段:定义 task、retry、flow 构造任务流;使用 engine 加载 flow;调用 engine.run() 触发执行;异步执行(可选)。

样例
import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow import task # INTRO: In this example we create two tasks, call Jim to ask Suzzie's # number, and then call Suzzie fail, this causes the workflow to enter the # reverting process class CallJim(task.Task): default_provides = 'suzzie_number' def execute(self, jim_number, *args, **kwargs): suzzie_phone = 666 print("Calling jim %s, get suzzie number %s." % (jim_number, suzzie_phone)) return suzzie_phone def revert(self, jim_number, *args, **kwargs): print("Calling jim %s and apologizing." % jim_number) class CallSuzzie(task.Task): def execute(self, suzzie_number, *args, **kwargs): raise IOError("Suzzie not home right now.") # Create your flow and associated tasks (the work to be done). flow = lf.Flow('simple-linear').add( CallJim(), CallSuzzie() ) try: # Now run that flow using the provided initial data (store below). taskflow.engines.run(flow, store=dict(jim_number=555)) except Exception as e: # NOTE(harlowja): This exception will be the exception that came out of the # 'CallSuzzie' task instead of a different exception, this is useful since # typically surrounding code wants to handle the original exception and not # a wrapped or altered one. print("Flow failed: %s" % e) |
执行结果(flow 不关联 retry 时默认为 AlwaysRevertAll):
Calling jim 555, get suzzie number 666. Calling jim 555 and apologizing. Flow failed: Suzzie not home right now. |
最后
TaskFlow 在保证可靠性的前提下极大简化了开发流程,将关注点集中在业务逻辑本身,代码更清晰,功能也更强大,更完善。同时由于它本身的复杂性,在任务构造时也需要遵循一些约定来充分发挥它的优势:
一个任务只做一件事
充分考虑任务失败的影响,在 revert 里定义回滚操作
显式定义任务执行完生成的 provides 参数,方便后续 task 调用
显示定义 task 运行需要的入参 requires,明确依赖关系和执行顺序
使用 parallel engine 时不要忽略任务执行时的线程安全
保持任务命名和异常返回信息的清晰准确




