暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

C++ Actor Framework(CAF)介绍

purecpp 2021-05-30
2518

在介绍C++ Actor Framework(CAF):

https://github.com/actor-framework/actor-framework

之前需要先对actor mode的一些概念做一个简单的介绍。

什么是actor mode

actor是基本的计算单元,它接收一条消息并基于它进行计算。每个actor有自己的信箱(mailbox),actor之间通过发消息通信,避免了并发多线程操作。

CAF的一点历史

CAF第一个版本于2012年,至今仍然在发展当中,该库作者曾经希望CAF能作为boost.actor进入boost库,可惜boost社区对其不感兴趣,后来CAF独立发展。

CAF中的actor

CAF中的actor和actor模型是对应的,它是一个基本的计算单元,具体来说是一个函数或者一个类。

作为函数的actor

behavior adder() {
return {
[](int x, int y) {
return x + y;
},
[](double x, double y) {
return x + y;
}
};
}

在CAF中返回类型为behavior的函数就是一个函数式actor。

作为类的actor

class calculator : public event_based_actor {
public:
calculator(actor_config& cfg) : event_based_actor(cfg) {
// nop
}


behavior make_behavior() override {
return {
[](int x, int y) {
return x + y;
},
[](double x, double y) {
return x + y;
}
};
}
};

重写了make_behavior的类就是一个类类型的actor。

dynamic actor和static actor

behavior dynamic_actor_func() {
return {
[](int x, int y) {
return x + y;
}
};
}


using calculator_actor
= typed_actor<result<int32_t>(int32_t, int32_t)>;


calculator_actor::behavior_type typed_calculator_fun() {
return {
[](int32_t a, int32_t b) { return a + b; }
};
}

static typed actor和动态actor的差别在于它可以做编译期类型检查。

进程内actor之间通信

behavior adder() {
return {
[](int x, int y) {
return x + y;
},
[](double x, double y) {
return x + y;
}
};
}


int main() {
actor_system_config cfg;
actor_system sys{cfg};
// Create (spawn) our actor.
auto a = sys.spawn(adder);
// Send it a message.
scoped_actor self{sys};
self->send(a, 40, 2);
// Block and wait for reply.
self->receive(
[](int result) {
cout << result << endl; // prints “42”
}
);
}

actor_system spawn创建了一个actor,后面调用send方法向actor发消息,recieve则打印actor执行的结果, scoped_actor保证出了作用域之后释放actor。上面的例子是阻塞执行的,因为receive是同步阻塞的,非阻塞执行:

auto a = sys.spawn(adder);
sys.spawn(
[=](event_based_actor* self) -> behavior {
self->send(a, 40, 2);
return {
[=](int result) {
cout << result << endl;
self->quit();
}
};
}
);

另外一种非阻塞调用的方式是使用request-then链式调用的api:

auto a = sys.spawn(adder);
sys.spawn(
[=](event_based_actor* self) {
self->request(a, seconds(1), 40, 2).then(
[=](int result) {
cout << result << endl;
self->quit();
}
};
}
);

进程间actor通信

server

int main(int argc, char** argv) {
// Defaults.
auto host = "localhost"s;
auto port = uint16_t{42000};
auto server = false;
actor_system sys{...}; // Parse command line and setup actor system.
auto& middleman = sys.middleman();
actor a;
a = sys.spawn(adder);
auto bound = middleman.publish(a, port);
if (bound == 0)
return 1;
}

client

int main(int argc, char** argv) {
// Defaults.
auto host = "localhost"s;
auto port = uint16_t{42000};
auto server = false;
actor_system sys{...}; // Parse command line and setup actor system.
auto& middleman = sys.middleman();
actor a;
auto r = middleman.remote_actor(host, port);
if (!r)
return 1;
a = *r;


sys.spawn(
[=](event_based_actor* self) {
self->request(a, seconds(1), 40, 2).then(
[=](int result) {
cout << result << endl;
self->quit();
}
};
);
}

caf通过middleman去创建server/client actor,publish接口监听一个端口,middleman.remote_actor则通过ip和端口去连接,返回actor,后面actor通信和之前进程内actor通信一样。

caf线程模型

caf actor_system初始化时会根据核数创建对应的worker,每个worker一个线程和一个队列,创建的actor会放到队列中,线程去消费队列,每个线程先消费自己的队列,当队列为空时会做work steal去消费其他线程的队列的actor。

caf消息模型

caf内置了大量的基础类型,在初始化的时候会去注册这些类型,以便actor处理消息的时候做序列化/反序列化。对于自定义消息则需要通过几个宏去定义并添加到caf的类型系统中,这里稍显麻烦。

定义custom消息类型

// --(rst-type-id-block-begin)--
struct foo;
struct foo2;


CAF_BEGIN_TYPE_ID_BLOCK(custom_types_1, first_custom_type_id)


CAF_ADD_TYPE_ID(custom_types_1, (foo))
CAF_ADD_TYPE_ID(custom_types_1, (foo2))
CAF_ADD_TYPE_ID(custom_types_1, (std::pair<int32_t, int32_t>) )


CAF_END_TYPE_ID_BLOCK(custom_types_1)
// --(rst-type-id-block-end)--

先定义了一个父类型custom_types_1,在它下面添加一个或多个具体的消息类型。自定义类型由父类型和子类型组成,允许定义多个具体的消息子类型,父类型用于类型注册用。

在main中注册自定义消息(不注册会编译不过或crash)

caf::exec_main_init_meta_objects<custom_types_1>();

发送指针

如果要向多个actor发送一个长消息的时候可以发送caf的智能指针避免消息的拷贝。

auto heavy = vector<char>(1024 * 1024);
auto msg = make_message(move(heavy));
for (auto& r : receivers)
self->send(r, msg);

这里通过make_message创建了一个msg的智能指针(caf内部实现了引用计数)。

copy-on-write

caf提供了消息的copy-on-write机制,语义就是在需要修改消息内容的时候才copy,caf是怎么实现copy on write的呢?

caf的copy on write有两个条件:

  1. 消息的引用计数大于1

  2. 以非常量引用方式从msg中取值

为什么是这两个条件呢,因为引用计数大于一表明其他actor也在引用这个消息,只有在有多个对象引用该消息时copy才有意义,没有其它对象引用则永远不需要copy了。另外以非常量引用从msg中取值的时候表明用户的意图是想修改数据,这时候就需要copy了。当然你也可以认为这是caf的一个约定,也许用户不小心漏掉了const那就会自动copy了,这也算是copy on write的一个坑。

那么caf具体是怎么实现copy on write的呢?

auto heavy = vector<char>(1024 * 1024);
auto msg = make_message(move(heavy));
for (auto& r : receivers)
self->send(r, msg);


behavior reader() {
return {
[=](const vector<char>& buf) {
f(buf);
}
};
}




//copy on writebehavior writer() {
return {
[=](vector<char>& buf) {
f(buf);
}
};
}

caf会先判断actor参数是否为非常量引用类型,如果是非常量引用类型就会构造一个非常量引用的msg view,反之则会创建一个const msg view,当解析发送的消息时会调用对应的msg view构造函数,非常量引用的msg view则会调用msg的ptr()函数,在该函数中做copy,如果是const msg view则不会做copy。

  ///message.hpp
detail::message_data* ptr() noexcept {
return data_.unshared_ptr();
}


/// @private
const detail::message_data* ptr() const noexcept {
return data_.get();
}


/// intrusive_cow_ptr.hpp
/// Returns a mutable pointer to the managed object.
pointer unshared_ptr() {
return intrusive_cow_ptr_unshare(ptr_.ptr_);
}


template <class T>
T* default_intrusive_cow_ptr_unshare(T*& ptr) {
if (!ptr->unique()) {
auto new_ptr = ptr->copy();
intrusive_ptr_release(ptr);
ptr = new_ptr;
}
return ptr;
}

总结

caf通过behavior定义actor,actor systm可以创建大量的actor放到线程的队列中,通过worksteal提升actor的处理效率;actor通过发消息避免了并发的竞争,actor之间发送消息支持copy on write。节点间的actor通过middleman来发消息,需要监听端口和显式ip,port去连接。

caf在进程内之间发消息处理得比较好(支持worker steal,copy-on-write),但是在分布式方面比较弱,如:的分布式actor目前没有处理actor计算调度,也没有处理fail over,这些需要上层应用去处理。

本文对caf做了一个简要的介绍,如果希望更深入去了解这个框架可以查看caf的源码。

后续还会介绍另外一个知名的c++ 分布式计算框架:ray,也会将ray和caf做一些对比。


文章转载自purecpp,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论