暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Sogou C++ Workflow的Task理念

purecpp 2020-11-28
3278


[Workflow](https://github.com/sogou/workflow)是搜狗最近开源的一款C++后端异步引擎,计算与通信无缝结合,支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理超百亿请求。

不仅包括通信、计算、文件IO、定时器、计数器等异步资源,而且创新性引入的任务流概念,使得开发者得以把异步任务组装起来实现复杂的业务逻辑。内部自带多种通用协议,跨平台、支持多种操作系统,自带服务治理与负载均衡。是一个并行计算与异步通信融为一体的编程范式。

本文着重介绍Workflow的一项重要理念:Task。

一切皆是Task

在Workflow的编程范式中,程序=协议+算法+任务流,这三部分应该是可以独立开发的。前文提到,我们提供了多种异步资源,比如计算、通信、文件IO等,这些异步资源以一种统一的方式提供给用户使用,那就是Task。

一个算法调用、一种协议通信等,都是Task。实际上,一切可以独立完成某项任务的、边界清晰明确的代码模块,都可以抽象为一个Task。

Task之间可以互相组合,从而可以由简单Task构建出复杂Task,进而得以组建任务流。 

接下来我们从Client-Server的角度来看一下:

Client请求是Task

使用Workflow作为Client,十分方便。 例如,创建一个HTTP请求,可以这么写:

static void callback(WFHttpTask* task)
{
...
}


auto *task = WFTaskFactory::create_http_task("http://www.sogou.com", 1, 3, callback);


task->start();




这条语句创建了一个对`http://www.sogou.com`的请求,允许1次重定向,最多可以重试3次,通信结束后自动调用callback。callback结束后Task执行完毕,自动销毁。这个请求HTTP请求被包装为一个Task返回给用户,用户具体的操作都可以在这个Task上进行。

网络通信上,除了`HTTP`,Workflow还实现了`Redis`协议、`MySQL`协议、`Kafka`协议等。

而在计算上,我们也对计算调度进行封装,使得用户可以使用`WFThreadTask`来创建算法任务,任何一次边界清晰的复杂计算,都应该包装成算法,而我们内部也提供了一些通用算法。

Task大多是通过工厂类`WFTaskFactory`的静态方法创建的。

Server处理对象也是Task

上面的程序片段已经展示了如何发起一个HTTP请求,下面的程序说明如何构建一个HTTP Server。

void process(WFHttpTask *task)
{
task->get_resp()->append_output_body("<p>Hello</p>");
}


WFHttpServer server(process);
server.start(8080);

Workflow的理念是一切皆是Task,作为Server,也可以用Task的方式实现。 `server.start(8080)`之后,当有客户端对8080端口发起HTTP请求时, Workflow会生成一个Task,并调用`void process(WFHttpTask *)`来处理这个Task。因此,用户只要在process中实现服务逻辑,即可构建出一个服务器。

复杂的服务器逻辑,很有可能需要Client Task的帮助,比如一个代理服务器,收到Client的请求之后,需要向真实的Server发出请求,并将应答回复给Client, 这种Task之间的组合,正是Workflow擅长的地方。

Task间的相互组合
如何使用Task构建异步任务流,实现程序业务逻辑?Workflow支持常用的串并联,也支持更加复杂的DAG结构。

串并联

auto *parallel = Workflow::create_parallel_work(nullptr);


auto *t1 = WFTaskFactory::create_http_task("http://www.sogou.com", 1, 3, nullptr);
auto *t2 = WFTaskFactory::create_go_task("A", some_function_A);
auto *t3 = WFTaskFactory::create_go_task("B", some_function_B;


auto *t4 = WFTaskFactory::create_redis_task("redis://127.0.0.1:6379/1", 3, nullptr);


parallel->add_series(Workflow::create_series_work(t1, nullptr));
parallel->add_series(Workflow::create_series_work(t2, nullptr));


auto *series = Workflow::create_series_work(t1, nullptr);
series->push_back(parallel);
series->push_back(t4);


series->start();

上面的代码,实现了下图中的先后次序:
当t1的callback执行结束时,t2和t3将会开始运行;
当t2和t3的callback都执行结束时,t4将会开始运行。

          |--> t2 ---|
--> t1 ---| |--> t4 ---
|--> t3 ---|





这种串并联机制可以应对大多数简单的场景。这里需要引入两个概念:
SeriesWork表示串行
ParallelWork表示并行

同时,SereisWork和ParallelWork有以下规则:
1.SeriesWork由若干Task串联而成,任何运行中的Task都属于某个SeriesWork;
2. ParallelWork由若干SeriesWork并联而成;
3. ParallelWork是一种Task。

因此,workflow的Task就可以无限复杂地组合下去。

动态地串并联

串并联除了可以静态地组装,也可以动态地创建。例如:

auto *t1 = WFTaskFactory::create_http_task("http://www.sogou.com", 1, 3, [](WFHttpTask * task) {
auto *t2 = WFTaskFactory::create_go_task("", [](){});
auto *series = series_of(task);
series->push_back(t2);
});


t1->start();

由于Task必然是属于某个SeriesWork,因此可以在Task的callback中拿到这个SeriesWork,向其添加新的Task。

总结

Workflow的Task理念是一种完备的编程范式,统一了各种异步资源及其上的算法、协议、任务流。有了这个层次,才得以巧妙地将各种异步过程结合起来,方便用户使用。

Workflow还有其它有趣的特色,后面会继续介绍。在Workflow的[主页](https://github.com/sogou/workflow)上,也有十分详细的进一步的文档介绍,欢迎各位访问。

文章转载自purecpp,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论