暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Openstack工作原理——TaskFlow长流程运行机制

云拾 2018-11-08
4144

    TaskFlow 是 OpenStack 项目下的一个 python 公共库,给 OpenStack 和其他的 python 项目提供一个简单、统一、可靠、和可扩展的任务执行框架。


    它能够跟踪和控制应用程序中复杂逻辑任务的执行、暂停、重启、恢复以及回滚,保证任务执行的可靠性和一致性,同时提供了计量、执行记录、进度和状态追踪的能力,给 OpenStack 的长流程任务和复杂状态转换过程提供了一个统一的解决方案,目前包括 Cinder、Glance、Neutron、Octavia 等项目都使用了 TaskFlow 来管理各自复杂的业务流程,比如 create volume、import image 等。


create volume 流程



设计架构

    TaskFlow 本身的实现比较复杂,它把预先定义好的一系列小 Task 按一定顺序和规则组合成一个或多个 Workflow,经过 Engine 载入后同步执行或调度给执行节点异步执行,同时提供执行过程和任务状态的追踪能力,可以根据预定义策略对异常任务进行回滚或重试处理。



    TaskFlow 分为提供同步执行能力的基础部分(上图左半部分)和依赖外部支撑提供异步运行的可选部分(右半部分),目前 OpenStack 的组件主要用的还是基础部分的功能。


基本概念

Atom:原子,TaskFlow 里的最小执行单位,抽象基类,派生出Task、Retry等不同的任务类型。


Task:任务,定义了执行 execute() 和回滚 revert() 操作的工作单元,所有开发者自定义的 task 都必须继承 taskflow.task.Task 类。例子:



Flow:任务流,以一定的次序组织和运行 Task,失败时触发关联的 Retry 进行回滚。Flow 可以包含 Task,也可以嵌套其他 Flow,有三种类型:

  • Linear(linear_flow.Flow):线性流,task/flow 按定义的顺序依次执行,按倒序依次回滚

  • Unordered(unordered_flow.Flow):无序流,task/flow 可能按照任意的顺序执行和回滚

  • Graph(graph_flow.Flow):图流,task/flow 按定义时的 provides/requires 参数隐含依赖关系或者显式添加的 link 关系来执行和回滚


Retry:重试,和 Task 一样继承自 Atom,也有 execute() 和 revert() 方法,多了一个 on_failure(history, *args, **kwargs) 方法,在 TaskFlow 执行或回滚发生错误时,使用前面执行异常的 history 信息返回重试策略,TaskFlow 根据重试策略执行对应的异常处理。

    Retry 重试策略有三种:

  • REVERT:仅回滚 Retry 关联的 Subflow

  • REVERT_ALL:回滚整个 TaskFlow 流程

  • RETRY:重试关联的 Subflow


    TaskFlow 同时提供了几个实现好的 Retry 派生类,开发者也可以直接使用这些类来处理异常:

  • AlwaysRevert 类:有异常发生时,回滚所在 Subflow

  • AlwaysRevertAll 类:异常发生时,回滚整个 TaskFlow

  • Times 类:异常发生时,重试 Subflow 指定次数

  • ForEach 类:异常发生时,使用一个预定义的参数列表重跑 Subflow,直到成功或定义的值用完为止

  • ParameterizedForEach 类:异常发生时,根据动态传入的参数列表重试 Subflow,直到成功或值用完为止


Engine:执行引擎,真正负责管理和执行 Flow/Task/Retry 的组件,有三种类型:

  • Serial:单线程,所有的 Task 都在 engine.run 的线程中运行

  • Parallel:调度没有依赖关系的 Task 到多个线程同步运行

  • Workers:调度 Task 到不同的 woker 中异步运行


工作流程

    TaskFlow 流程可以分为几个阶段:定义 task、retry、flow 构造任务流;使用 engine 加载 flow;调用 engine.run() 触发执行;异步执行(可选)。


样例

import taskflow.engines

from taskflow.patterns import linear_flow as lf

from taskflow import task


# INTRO: In this example we create two tasks, call Jim to ask Suzzie's

# number, and then call Suzzie fail, this causes the workflow to enter the

# reverting process


class CallJim(task.Task):

    default_provides = 'suzzie_number'

    def execute(self, jim_number, *args, **kwargs):

        suzzie_phone = 666

        print("Calling jim %s, get suzzie number %s." %

              (jim_number, suzzie_phone))

        return suzzie_phone

    def revert(self, jim_number, *args, **kwargs):

        print("Calling jim %s and apologizing." % jim_number)


class CallSuzzie(task.Task):

    def execute(self, suzzie_number, *args, **kwargs):

        raise IOError("Suzzie not home right now.")


# Create your flow and associated tasks (the work to be done).

flow = lf.Flow('simple-linear').add(

    CallJim(),

    CallSuzzie()

)


try:

    # Now run that flow using the provided initial data (store below).

    taskflow.engines.run(flow, store=dict(jim_number=555))

except Exception as e:

    # NOTE(harlowja): This exception will be the exception that came out of the

    # 'CallSuzzie' task instead of a different exception, this is useful since

    # typically surrounding code wants to handle the original exception and not

    # a wrapped or altered one.

    print("Flow failed: %s" % e)

执行结果(flow 不关联 retry 时默认为 AlwaysRevertAll):

Calling jim 555, get suzzie number 666.

Calling jim 555 and apologizing.

Flow failed: Suzzie not home right now.


最后

    TaskFlow 在保证可靠性的前提下极大简化了开发流程,将关注点集中在业务逻辑本身,代码更清晰,功能也更强大,更完善。同时由于它本身的复杂性,在任务构造时也需要遵循一些约定来充分发挥它的优势:

  • 一个任务只做一件事

  • 充分考虑任务失败的影响,在 revert 里定义回滚操作

  • 显式定义任务执行完生成的 provides 参数,方便后续 task 调用

  • 显示定义 task 运行需要的入参 requires,明确依赖关系和执行顺序

  • 使用 parallel engine 时不要忽略任务执行时的线程安全

  • 保持任务命名和异常返回信息的清晰准确 

文章转载自云拾,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论