在 C++11 中实现一个简单的通用线程池
Implementing a simple, generic thread pool in C++11
我想创建一个线程池用于实验目的(以及有趣的因素)。它应该能够处理各种各样的任务(所以我可能会在以后的项目中使用它)。
在我的线程池中 class 我将需要某种任务队列。由于标准库提供 std::packaged_task
自 C++11 标准以来,我的队列将看起来像 std::deque<std::packaged_task<?()> > task_queue
,因此客户端可以通过某种 [=55] 将 std::packaged_task
推入队列=] 接口函数(然后池中的一个线程将被通知一个条件变量来执行它,等等)。
我的问题与双端队列中 std::packaged_task<?()>
的模板参数有关。
函数签名 ?()
应该能够处理任何 type/number 参数,因为客户端可以做类似的事情:
std::packaged_task<int()> t(std::bind(factorial, 342));
thread_pool.add_task(t);
这样我就不用处理type/number个参数了
但是 return 值应该是多少?(因此是问号)
如果我将整个线程池 class 设为模板 class,一个实例
它将只能处理具有特定签名的任务
(比如 std::packaged_task<int()>
)。
我希望一个线程池对象能够处理任何类型的任务。
如果我使用 std::packaged_task<void()>
并调用函数
return是一个整数,或者任何东西,那就是未定义的行为。
我碰巧有一个实现可以做到这一点。我的做法是将 std::packaged_task
对象包装在一个抽象出 return 类型的结构中。将任务提交到线程池的方法 return 是结果的未来。
这种方法可行,但由于每个任务都需要内存分配,因此它不适合非常短且非常频繁的任务(我试图用它来并行化流体模拟的块,但开销是太高了,对于 324 个任务来说大约是几毫秒)。
关键部分是这个结构:
struct abstract_packaged_task
{
template <typename R>
abstract_packaged_task(std::packaged_task<R> &&task):
m_task((void*)(new std::packaged_task<R>(std::move(task)))),
m_call_exec([](abstract_packaged_task *instance)mutable{
(*(std::packaged_task<R>*)instance->m_task)();
}),
m_call_delete([](abstract_packaged_task *instance)mutable{
delete (std::packaged_task<R>*)(instance->m_task);
})
{
}
abstract_packaged_task(abstract_packaged_task &&other);
~abstract_packaged_task();
void operator()();
void *m_task;
std::function<void(abstract_packaged_task*)> m_call_exec;
std::function<void(abstract_packaged_task*)> m_call_delete;
};
如您所见,它通过使用带有 std::function
和 void*
的 lambda 来隐藏类型依赖性。如果您知道所有可能出现的 std::packaged_task
对象的最大大小(我根本没有检查该大小是否依赖于 R
),您可以尝试通过删除内存分配来进一步优化它。
提交到线程池的方法然后是这样的:
template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
assert(m_workers.size() > 0);
std::future<R> result = task.get_future();
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_task_queue.emplace_back(std::move(task));
}
m_queue_wakeup.notify_one();
return result;
}
其中 m_task_queue
是 abstract_packaged_task
结构的 std::deque
。 m_queue_wakeup
是一个 std::condition_variable
来唤醒工作线程来接任务。工作线程实现非常简单:
void ThreadPool::worker_impl()
{
std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
while (!m_terminated) {
lock.lock();
while (m_task_queue.empty()) {
m_queue_wakeup.wait(lock);
if (m_terminated) {
return;
}
}
abstract_packaged_task task(std::move(m_task_queue.front()));
m_task_queue.pop_front();
lock.unlock();
task();
}
}
你可以在我的 github.
上查看完整的 source code and the corresponding header
所以困难的部分是 packaged_task<R()>
是只移动的,否则你可以把它扔到 std::function<void()>
和 运行 你的线程中。
有几种解决方法。
首先,可笑的是,用一个packaged_task<void()>
存储一个packaged_task<R()>
。我建议不要这样做,但它确实有效。 ;)(operator()
在 packaged_task<R()>
上的签名是什么?您传递给 packaged_task<void()>
的对象所需的签名是什么?)
其次,将您的 packaged_task<R()>
包装在 shared_ptr
中,将其捕获在具有签名 void()
的 lambda 中,将其存储在 std::function<void()>
中,然后完成。这有间接费用,但可能比第一个解决方案少。
最后,编写您自己的仅移动函数包装器。对于签名 void()
,它很短:
struct task {
template<class F,
class dF=std::decay_t<F>,
class=decltype( std::declval<dF&>()() )
>
task( F&& f ):
ptr(
new dF(std::forward<F>(f)),
[](void* ptr){ delete static_cast<dF*>(ptr); }
),
invoke([](void*ptr){
(*static_cast<dF*>(ptr))();
})
{}
void operator()()const{
invoke( ptr.get() );
}
task(task&&)=default;
task&operator=(task&&)=default;
task()=default;
~task()=default;
explicit operator bool()const{return static_cast<bool>(ptr);}
private:
std::unique_ptr<void, void(*)(void*)> ptr;
void(*invoke)(void*) = nullptr;
};
简单。上面可以为任何类型 R
存储 packaged_task<R()>
,并在以后调用它们。
这具有相对最小的开销 -- 它应该比 std::function
便宜,至少是我见过的实现 -- 除了它不执行 SBO(小缓冲区优化),它存储小函数对象在内部而不是在堆上。
如果需要,您可以通过小的缓冲区优化来改进 unique_ptr<> ptr
容器。
我想创建一个线程池用于实验目的(以及有趣的因素)。它应该能够处理各种各样的任务(所以我可能会在以后的项目中使用它)。
在我的线程池中 class 我将需要某种任务队列。由于标准库提供 std::packaged_task
自 C++11 标准以来,我的队列将看起来像 std::deque<std::packaged_task<?()> > task_queue
,因此客户端可以通过某种 [=55] 将 std::packaged_task
推入队列=] 接口函数(然后池中的一个线程将被通知一个条件变量来执行它,等等)。
我的问题与双端队列中 std::packaged_task<?()>
的模板参数有关。
函数签名 ?()
应该能够处理任何 type/number 参数,因为客户端可以做类似的事情:
std::packaged_task<int()> t(std::bind(factorial, 342));
thread_pool.add_task(t);
这样我就不用处理type/number个参数了
但是 return 值应该是多少?(因此是问号)
如果我将整个线程池 class 设为模板 class,一个实例 它将只能处理具有特定签名的任务 (比如
std::packaged_task<int()>
)。我希望一个线程池对象能够处理任何类型的任务。
如果我使用
std::packaged_task<void()>
并调用函数 return是一个整数,或者任何东西,那就是未定义的行为。
我碰巧有一个实现可以做到这一点。我的做法是将 std::packaged_task
对象包装在一个抽象出 return 类型的结构中。将任务提交到线程池的方法 return 是结果的未来。
这种方法可行,但由于每个任务都需要内存分配,因此它不适合非常短且非常频繁的任务(我试图用它来并行化流体模拟的块,但开销是太高了,对于 324 个任务来说大约是几毫秒)。
关键部分是这个结构:
struct abstract_packaged_task
{
template <typename R>
abstract_packaged_task(std::packaged_task<R> &&task):
m_task((void*)(new std::packaged_task<R>(std::move(task)))),
m_call_exec([](abstract_packaged_task *instance)mutable{
(*(std::packaged_task<R>*)instance->m_task)();
}),
m_call_delete([](abstract_packaged_task *instance)mutable{
delete (std::packaged_task<R>*)(instance->m_task);
})
{
}
abstract_packaged_task(abstract_packaged_task &&other);
~abstract_packaged_task();
void operator()();
void *m_task;
std::function<void(abstract_packaged_task*)> m_call_exec;
std::function<void(abstract_packaged_task*)> m_call_delete;
};
如您所见,它通过使用带有 std::function
和 void*
的 lambda 来隐藏类型依赖性。如果您知道所有可能出现的 std::packaged_task
对象的最大大小(我根本没有检查该大小是否依赖于 R
),您可以尝试通过删除内存分配来进一步优化它。
提交到线程池的方法然后是这样的:
template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
assert(m_workers.size() > 0);
std::future<R> result = task.get_future();
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_task_queue.emplace_back(std::move(task));
}
m_queue_wakeup.notify_one();
return result;
}
其中 m_task_queue
是 abstract_packaged_task
结构的 std::deque
。 m_queue_wakeup
是一个 std::condition_variable
来唤醒工作线程来接任务。工作线程实现非常简单:
void ThreadPool::worker_impl()
{
std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
while (!m_terminated) {
lock.lock();
while (m_task_queue.empty()) {
m_queue_wakeup.wait(lock);
if (m_terminated) {
return;
}
}
abstract_packaged_task task(std::move(m_task_queue.front()));
m_task_queue.pop_front();
lock.unlock();
task();
}
}
你可以在我的 github.
上查看完整的 source code and the corresponding header所以困难的部分是 packaged_task<R()>
是只移动的,否则你可以把它扔到 std::function<void()>
和 运行 你的线程中。
有几种解决方法。
首先,可笑的是,用一个packaged_task<void()>
存储一个packaged_task<R()>
。我建议不要这样做,但它确实有效。 ;)(operator()
在 packaged_task<R()>
上的签名是什么?您传递给 packaged_task<void()>
的对象所需的签名是什么?)
其次,将您的 packaged_task<R()>
包装在 shared_ptr
中,将其捕获在具有签名 void()
的 lambda 中,将其存储在 std::function<void()>
中,然后完成。这有间接费用,但可能比第一个解决方案少。
最后,编写您自己的仅移动函数包装器。对于签名 void()
,它很短:
struct task {
template<class F,
class dF=std::decay_t<F>,
class=decltype( std::declval<dF&>()() )
>
task( F&& f ):
ptr(
new dF(std::forward<F>(f)),
[](void* ptr){ delete static_cast<dF*>(ptr); }
),
invoke([](void*ptr){
(*static_cast<dF*>(ptr))();
})
{}
void operator()()const{
invoke( ptr.get() );
}
task(task&&)=default;
task&operator=(task&&)=default;
task()=default;
~task()=default;
explicit operator bool()const{return static_cast<bool>(ptr);}
private:
std::unique_ptr<void, void(*)(void*)> ptr;
void(*invoke)(void*) = nullptr;
};
简单。上面可以为任何类型 R
存储 packaged_task<R()>
,并在以后调用它们。
这具有相对最小的开销 -- 它应该比 std::function
便宜,至少是我见过的实现 -- 除了它不执行 SBO(小缓冲区优化),它存储小函数对象在内部而不是在堆上。
如果需要,您可以通过小的缓冲区优化来改进 unique_ptr<> ptr
容器。