1.6 MiniOB 线程模型
本节主要介绍 MiniOB 的底层数据结构线程池,线程池是做并发处理时非常常用的手段。希望通过本节内容的学习,大家在做 MiniOB 训练需要阅读代码时,不用花费特别多的精力去挖掘相关技术内容。
SEDA 框架
首先回顾一下 MiniOB 框架,MiniOB 有很多处理阶段,如词法解析、语法解析、Resolver、优化器等。MiniOB 使用了 SEDA 的框架,像 Resolver、优化器等在 SEDA 中都是一个个的 Stage,各个 Stage 之间通过事件来传递数据。MiniOB 的多线程能力是基于 SEDA 来做的,SEDA 本身就是支持线程池的。
下图所示是一个简单的 SEDA 框架图,能够很好的帮助理解 MiniOB 中的每个 Stage 和 event,以及线程池之间的关系。线程池可以有多个,每个线程池可以包含多个 Stage,一个 Stage 会绑定到某一个线程池里面。每个线程池可以设置自己的线程的个数、事件队列的大小,线程池中的多个 Stage 之间是通过事件通讯的。虽然示例图中写的都是 StageEvent,但也可以是不同的名字。另外,不同的线程池之间的 Stage 也可以通过事件通讯,并且每一个线程池里面的 Stage 可以是不同的。
SEDA 框架有一个好处是可以将 Stage 进行分类,比如把处理 SQL 的分作一类,把 IO 处理的分作为一类,并把不同的分类分配到不同的线程池。这样在处理任务时不容易受到其他干扰,并且也能够根据真实的业务场景去调整每个线程池的大小。
接下来看一个简单的 Stage 模型。一个 Stage 对应一个类型的事件,事件是放在线程池事件队列中的。线程池事件指当事件队列有事件后,就将其交给对应的 Stage 来处理。
事件的生命周期
创建 event 的时候,它就会被加到一个线程事件队列里面(参考 net/server.cpp 中 session_stage_->add_event(sev);)。添加到事件队列里面之后,就会由 Stage 调用 handle_event 去处理(参考 session/session_stage.cpp 中 SessionStage::handle_event)。处理完成之后,event 就结束了。
事件在处理的时候,还可以增加一些回调函数(比如 session/session_stage.cpp 中的 sev->push_callback(cb);),后续将事件交给其它的 Stage 做处理,处理完成后就会调用回调函数。也可以在回调函数中做一些任务处理,在代码里可以看到很多,比如 done_immediate 就是立即处理完成(比如 parser/parse_stage.cpp 中的 event->done_immediate())。
MiniOB 事件执行过程
客户端通过网络发送一个请求到服务端,服务端在接收到消息后会创建一个消息包,并创建对应的 SessionEvent。然后调用 add_event 把 SessionEvent 添加到 SessionStage 里(参考 net/server.cpp 中 session_stage_->add_event(sev);),再放到对应的事件队列中。SessionStage 以及后续处理事件的几个 Stage,都在 SQLThreads 线程池中。
SessionStage 处理 SessionEvent 后,会创建一个 SQLStageEvent 传递给后面的 Stage。后续的 Stage 都是处理这个 SQLStageEvent 的事件。如下图所示,事件经过一个流转,处理完成后返回结果给客户端。
MiniOB 代码解读
接下来介绍一下 MiniOB 代码中的 SEDA 框架、线程池以及各个 Stage。如下所示,在配置文件 etc/observer.ini 中可以很清晰的看到 MiniOB 有哪些线程池(SQL、IO 和 Default)以及 Stage。每个 Stage 可以配置自己位于哪个线程池、关联的下一个 Stage 以及所属线程池,如果不配置,则使用默认的线程池。
[SQLThreads] # 线程的个数,0 就会用 CPU 的个数 count=3 [IOThreads] count=3 [DefaultThreads] # 如果 Stage 没有配置线程池,就会用这个默认的线程池 count=3 [SessionStage] ThreadId=SQLThreads NextStages=PlanCacheStage [PlanCacheStage] ThreadId=SQLThreads #NextStages=OptimizeStage NextStages=ParseStage [ParseStage] ThreadId=SQLThreads NextStages=ResolveStage [ResolveStage] ThreadId=SQLThreads NextStages=QueryCacheStage [QueryCacheStage] ThreadId=SQLThreads NextStages=OptimizeStage [OptimizeStage] ThreadId=SQLThreads NextStages=ExecuteStage [ExecuteStage] ThreadId=SQLThreads NextStages=DefaultStorageStage,MemStorageStage [DefaultStorageStage] ThreadId=IOThreads BaseDir=./miniob SystemDb=sys [MemStorageStage] ThreadId=IOThreads [MetricsStage] NextStages=TimerStage
初始化阶段时先读取配置文件 etc/observer.ini,然后创建各个 Stage。 prepare_init_seda 函数中创建了很多静态的 StageFactory。这里根据名字可以关联到配置文件中的相关配置,并且提供了一个创建 Stage 对象的函数,这些 Stage 会有一些共同的接口。
SEDA 框架本身的初始化函数是 seda_config.cpp 中的 SedaConfig::init 函数。这里会创建默认线程池和一些 Stage,比如 SEDA 框架自带的 Metric Stage。接着把各个线程池、各个 Stage 做关联和启动,参考 SedaConfig::init_stages@seda_config.cpp ,然后做 Stage 对象的实例化。最后按照配置文件,通过刚才创建的静态 StageFactory 对象注册的一些 Stage 对象,执行初始化、启动。
// MiniOB 的初始化函数,在 init.cpp 中
int init(ProcessParam *process_param)
{
...
// seda is used for backend async event handler
// the latency of seda is slow, it isn't used for critical latency
// environment.
prepare_init_seda();
rc = init_seda(process_param);
if (rc) {
LOG_ERROR("Failed to init seda configuration!");
return rc;
}
....
}
// MiniOB 初始化前,执行的 prepare 动作,在 init.cpp 文件中
int prepare_init_seda()
{
static StageFactory session_stage_factory("SessionStage", &SessionStage::make_stage);
static StageFactory resolve_stage_factory("ResolveStage", &ResolveStage::make_stage);
....
return 0;
}
接下来以 SessionStage 为例介绍一下 Stage。
make_stage 就是创建一个 Stage 对象,参考
SessionStage::make_stage@session_stage.cpp,这里就对应了 StageFactory 的创建对应的接口。set_properties 接口可以从配置文件中读取一些配置数据,参考
SessionStage::set_properties@session_stage.cpp,不过 SessionStage 没有任何配置。SessionStage::initialize是 Stage 的初始化接口,这里可以做一些初始化动作。SessionStage::handle_event是 Stage 处理事件的关键接口,当有对应事件到达时,SEDA 就会调用此接口。
对事件回调函数的使用可以参考 SessionStage::handle_request,在调用下一个 Stage 之前,先创建一个 callback,然后 push_back 到 event 回调队列中,代码参考:
CompletionCallback *cb = new (std::nothrow) CompletionCallback(this, nullptr);
if (cb == nullptr) {
sev->done_immediate();
return;
}
sev->push_callback(cb);SEDA 框架是一个非常优秀的线程池事件处理框架,它的扩展性、灵活度都非常高。感兴趣的同学可以去 deps/common/seda 目录下查看源码。




