暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

聊聊线程池(二)

purecpp 2021-12-01
446

本文是线程池系列文章的第二篇,这个系统会从一个简单多线程单队列的线程池讲起,接着介绍多线程多队列的线程池和work steal线程池,让读者对线程池的实现和应用有一个更深入的认识。

前文讲解过如何用几十行代码实现一个简单的多线程单队列线程池,本文讲接着介绍如何实现一个多线程多队列的线程池,即每个线程一个队列的线程池。在此之前我们需要对之前实现的简单线程池做一个重构。为什么要重构?为了复用代码。无论多线程单队列还是多线程多队列的线程池都有队列,如果把队列相关的代码抽取出来的话,那么就可以被多线程多队列的线程池复用了。

重构

将队列的代码抽取出来。

#pragma once

#include <condition_variable>
#include <mutex>
#include <queue>

template <typename T> class Queue {
public:
void push(const T &item) {
{
std::scoped_lock lock(mtx_);
queue_.push(item);
}
cond_.notify_one();
}

void push(T &&item) {
{
std::scoped_lock lock(mtx_);
queue_.push(std::move(item));
}
cond_.notify_one();
}

bool pop(T &item) {
std::unique_lock lock(mtx_);
cond_.wait(lock, [&]() { return !queue_.empty() || stop_; });
if (queue_.empty())
return false;
item = std::move(queue_.front());
queue_.pop();
return true;
}

std::size_t size() const {
std::scoped_lock lock(mtx_);
return queue_.size();
}

bool empty() const {
std::scoped_lock lock(mtx_);
return queue_.empty();
}

void stop() {
{
std::scoped_lock lock(mtx_);
stop_ = true;
}
cond_.notify_all();
}

private:
std::condition_variable cond_;
mutable std::mutex mtx_;
std::queue<T> queue_;
bool stop_ = false;
};

这个代码大部分来自于之前的线程池,通过模板做了一个泛化,让它可以放任意类型的item。现在这个队列同时也可以被其他类型的线程池复用了。

需要注意push方法,为什么提供了两个push方法,能不能合并成一个push方法呢?像这样:

  void push(T &&item) {
{
std::scoped_lock lock(mtx_);
queue_.push(std::forward<T>(item));
}
cond_.notify_one();
}

通过完美转发来统一左值引用和右值引用的push。

其实不能这样写,因为void push(T &&item)里面的T&&并不是一个universal reference, 而是一个右值引用。为什么?因为Queue是一个模板类,当构造这个queue的时候就已经发生了类型推导,所以push里面的T已经是一个确定的类型了,T&&就是一个右值引用类型了,因此我们还需要提供一个左值引用版本的push。

有了这个queue之后我们再写线程池就简单了。

#include "safe_queue.h"
#include <functional>
#include <thread>
#include <vector>

using WorkItem = std::function<void()>;
class SimplePool {
public:
explicit SimplePool(size_t threads = std::thread::hardware_concurrency()) {
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back([this] {
for (;;) {
std::function<void()> task;
if (!queue_.pop(task)) {
return;
}

if (task) {
task();
}
}
});
}

void enqueue(WorkItem item) { queue_.push(std::move(item)); }
~SimplePool() {
queue_.stop();
for (auto &thd : workers_) {
thd.join();
}
}

private:
Queue<WorkItem> queue_;
std::vector<std::thread> workers_;
};

可以看到SimplePool的代码现在非常少了,enqueue的代码减少到一行了。

多线程多队列的线程池

接口设计

和之前线程池接口类似,只提供一个添加task的接口即可。

int schedule_by_id(std::function<void()> fn, size_t id = -1)

这里为什么不和之前一样提供一个enqueue接口呢?因为每个线程一个队列的场景和之前不太一样,一个task过来,我可以把它放到任意一个线程的队列中,我也可以把它放到指定的线程当中,所以这里的接口schedule_by_id,当用户不指定线程id的时候就随机选择一个线程,当用户指定了线程则调度到对应id的线程队列中。

实现

因为有了前面重构的safe queue,我们的实现可以变得比较简单。

#pragma once
#include "safe_queue.h"
#include <cassert>
#include <functional>
#include <thread>
#include <vector>

class MultiplePool {
public:
explicit MultiplePool(
size_t thread_num = std::thread::hardware_concurrency())
: queues_(thread_num), thread_num_(thread_num) {
auto worker = [this](size_t id) {
while (true) {
std::function<void()> task{};
if (!queues_[id].pop(task)) {
break;
}

if (task) {
task();
}
}
};

workers_.reserve(thread_num);
for (size_t i = 0; i < thread_num; i++) {
workers_.emplace_back(worker, i);
}
}

int schedule_by_id(std::function<void()> fn, size_t id = -1) {
if (fn == nullptr) {
return -1;
}

if (id == -1) {
id = rand() % thread_num_;
queues_[id].push(std::move(fn));
} else {
assert(id < thread_num_);
queues_[id].push(std::move(fn));
}

return 0;
}

~MultiplePool() {
for (auto &queue : queues_) {
queue.stop();
}

for (auto &worker : workers_) {
worker.join();
}
}

private:
std::vector<Queue<std::function<void()>>> queues_;
size_t thread_num_;
std::vector<std::thread> workers_;
};

先来看构造函数,由于是每个线程一个队列,所以线程和队列是一一对应的,通过线程id将他们关联起来,线程只从对应的队列中取出任务执行即可。

接着看schedule_by_id函数,当用户不指定线程id的时候,就random方式选择出一个线程的队列,然后将task放进去;如果用户指定线程id的时候就放到对应id的队列中去。

析构函数停止线程池就比较简单了,停止每个queue和退出线程即可。

再来看看测试代码:

void test_simple_pool1() {
SimplePool pool;
pool.enqueue([] {std::cout << "hello\n"; });
std::string str = "world";
pool.enqueue([str=std::move(str)] {std::cout << "hello " + str<<'\n'; });
}

void test_multiple_pool1() {
MultiplePool pool;
pool.schedule_by_id([] {std::cout << "hello\n"; });
std::string str = "world";
pool.schedule_by_id([str = std::move(str)]{ std::cout << "hello " + str << '\n'; });
}

性能比较

这两种线程池在性能上哪个更好呢?写一个性能测试代码比较一下。

#include <iostream>
#include <chrono>
#include <atomic>
#include "simple_pool.h"
#include "multiple_pool.h"

class ScopedTimer {
public:
ScopedTimer(const char *name)
: m_name(name), m_beg(std::chrono::high_resolution_clock::now()) {}
~ScopedTimer() {
auto end = std::chrono::high_resolution_clock::now();
auto dur =
std::chrono::duration_cast<std::chrono::nanoseconds>(end - m_beg);
std::cout << m_name << " : " << dur.count() << " ns\n";
}

private:
const char *m_name;
std::chrono::time_point<std::chrono::high_resolution_clock> m_beg;
};

const size_t COUNT = 1'000'000;
const size_t REPS = 10;

void benchmark() {
std::atomic<int> count = 0;
ScopedTimer timer("SimplePool ");
{
SimplePool pool;
for (size_t i = 0; i < COUNT; i++) {
pool.enqueue([i, &count] {
count++;
int x = 0;
auto reps = REPS + (REPS + (rand() % 5));
for (size_t j = 0; j < reps; j++) {
x = i + rand();
}
});
}
}
assert(count == COUNT);
}

void benchmark1() {
std::atomic<int> count = 0;
ScopedTimer timer("MultiplePool");
{
MultiplePool pool;
for (size_t i = 0; i < COUNT; i++) {
pool.schedule_by_id([i, &count] {
count++;
int x = 0;
auto reps = REPS + (REPS + (rand() % 5));
for (size_t j = 0; j < reps; j++) {
x = i + rand();
}
});
}
}
assert(count == COUNT);
}

int main() {
for (size_t i = 0; i < 6; i++) {
benchmark();
benchmark1();
}
std::cout << "Hello, World!" << std::endl;
return 0;
}

用vs2019测试,MultiplePool的性能是SimplePool两倍,说明一个线程一个队列的线程池性能更好。

在下一篇文章中我将会介绍如何实现一个更快的线程池--worksteal线程池,敬请期待!



    出自:purecpp

    地址: www.purecpp.org

    转载请注明出处!


文章转载自purecpp,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论