暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

聊聊线程池(一)

purecpp 2021-11-28
810

本文是线程池系列文章的第一篇,会从一个简单多线程单队列的线程池讲起,后面还会介绍多线程多队列的线程池和work steal线程池,让读者对线程池的实现和应用有一个更深入的认识。

用现代c++的线程和锁去实现一个简单的线程池非常

简单,不过几十行代码就可以了。来看看一个简单的线程池的实现到底有多简单。

基本实现思路在构造threadpool的时候初始化线程数,在析构函数中停止线程池,对外只需要提供一个enqueue task的接口就够了,至于其它的一些查询接口如size,empty等接口可以根据需要增加。

一个简单的线程池的基本接口像这样:

  explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency());

template <class F, class... Args>
auto enqueue(F &&f, Args &&...args);

接口设计

返回类型

这个enqueue接口返回的实际上是一个future,以便满足需要eager获取异步任务执行结果的需求,如果不需要eager取值的时候则可以忽略这个返回值,这样的接口比较灵活,可以满足通用和专用的需求。

这个future的类型是函数的返回类型:

std::future<std::invoke_result_t<F, Args...>>

可以看到这个future返回的类型实际上是task的返回类型,既可以是void类型也可以是非void类型。

所以这个唯一的enqueue接口同时处理了void和非void返回类型的情况,eager求值和非eager求值的场景,简单易用。

输入参数

输入参数是一个可调用调用对象及其参数,这里之所有没有使用std::function<void()>作为enqueue的参数,是为了方便用户使用。如果接口参数是std::function<void()>的时候,对于有参数的task则需要通过capture或者bind去转换为统一的std::function<void()>类型,这对于用户来说有时候不太方便,让用户输入函数和参数的方式更加方便和自然。

具体实现

来看看一个简单的threadpool的声明:

class ThreadPool {
public:
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency());
template <class F, class... Args>
auto enqueue(F &&f, Args &&...args);
~ThreadPool();

private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

先来看看它有哪些成员,有一个线程组和一个队列,所以这个简单的线程池是多线程从一个队列中抢任务的模式,也有一些线程池是一个线程一个队列,关于这类线程池在后面会介绍,本文先只介绍多线程单队列的线程池。

线程池还提供了锁和条件变量,这是因为刚开始线程池中是没有任务的,所有的线程都在等待任务的到来,当一个任务入队到线程池中的时候,需要通知线程去处理到来的任务,这就需要条件变量和和锁来实现线程通知机制了。

还有一个变量是stop用户来在析构threadpool的时候停止和清理任务和线程的,这里为什么不提供一个stop接口呢?因为线程池的生命周期基本上和应用的生命周期一致,线程池stop之后也不可能再重新restart了,所以,没必要再提供一个专门的stop接口了,直接在析构的时候stop即可。

接下来一个个函数来看看具体是怎么实现的:

初始化线程池

inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}

task();
}
});
}

先根据threads数量创建线程多个线程,每个线程都会等待两个条件:线程池不为空或者线程池stop,如果线程池stop并且task都处理完了则退出线程,这里是为了在stop标志设置后执行完(排空队)列中的任务。

当队列中有任务的时候会收到线程池的通知,线程被唤醒,然后从队列中取出任务执行,这里的一个细节是注意加锁的范围,task取出之后就应该解锁了。

enqueue task

template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&...args) {
using return_type = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);

if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");

tasks.emplace([task = std::move(task)] { (*task)(); });
}
condition.notify_one();
return res;
}

enqueue里面主要是通过std::package_task来获得一个函数执行的future返回出去,这里同时也处理了泛化函数到std::function<void()>的类型统一,避免了由用户在外面去做这个事情。这里有两个细节,一个是加锁范围,在入队之后就可以解锁了;另外一个细节是通过condition.notify_one()而不是notify_all去通知某一个线程去处理任务,因为是多个线程抢一个队列中的任务,一次只会有一个线程能抢到任务,如果通过notify_all去唤醒所有的线程是没有意义的,并且引起线程竞争,所以这里只需要唤醒一个线程去处理任务就够了。

析构(停止)线程池

inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers)
worker.join();
}

线程池析构主要是清理线程和任务,这里只需要把stop标志设置为true之后,通过notify_all唤醒所有线程即可,注意这里只能用notify_all而不是notify_one,因为是要停止所有线程而不是某一个线程。

由于线程在等待stop标志,所以当唤醒之后会把队列中的任务取出执行直到队列为空的时候才退出线程。

更多的细节

关于条件变量wait的一个细节是linux的虚假唤醒(spurious awakenings),std::condition_variable有两个wait接口,一个是带predict条件的wait,一个是不带条件的wait接口,使用不带条件的wait接口的时候需要注意虚假唤醒,需要这样写:

while (!stop) {
wait(lock);
}

即使wait因为操作系统的虚假唤醒,它还会通过while循环继续去判断一下stop是不是设置为true了,如果没有则会继续wait,从而避免了虚假唤醒可能引起的bug。

如果调用wait(lock, stop)则没有这个问题,因为它内部就是while-wait。

这个多线程单队列的线程池是不是很简单,总共不过几十行代码而已。但有时候在多线程去enqueue task的时候单线程线程池的性能就不是太好了,这时候就需要多线程多队列的线程池了,即一个线程一个队列,避免激烈的线程竞争,从而在多线程入队的时候获得更好的性能。在下一篇文章中将介绍这种线程池,敬请期待!



    出自:purecpp

    地址: www.purecpp.org

    转载请注明出处!


文章转载自purecpp,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论