c++ 线程池:替代 std::function 将 functions/lambdas 传递给线程?
c++ thread pool: alternative to std::function for passing functions/lambdas to threads?
我有一个线程池,用于执行许多小作业(数百万个作业,每个 dozens/hundreds 毫秒)。作业以以下任一形式传递:
std::bind(&fn, arg1, arg2, arg3...)
或
[&](){fn(arg1, arg2, arg3...);}
线程池像这样处理它们:
std::queue<std::function<void(void)>> queue;
void addJob(std::function<void(void)> fn)
{
queue.emplace_back(std::move(fn));
}
相当标准的东西....除了我注意到一个瓶颈,如果作业在足够快的时间内(小于一毫秒)执行,从 lambda/binder 到 std::function 的转换在 addJob 函数中实际上比执行作业本身花费的时间更长。读了一些书后,std::function 出了名的慢,所以我的瓶颈并不一定是意外的。
有没有更快的方法来做这类事情?我研究了插入式 std::function 替代品,但它们要么与我的编译器不兼容,要么速度不快。我还研究了 Don Clugston 的 "fast delegates",但他们似乎不允许将参数与函数一起传递(也许我没有正确理解它们?)。
我正在用 VS2015u3 编译,传递给作业的函数都是静态的,它们的参数是 ints/floats 或指向其他对象的指针。
每个任务类型都有一个单独的队列 - 您可能没有数以万计的任务类型。这些中的每一个都可以是例如你的任务的静态成员。然后 addJob()
实际上是 Task 的构造函数,它是完全类型安全的。
然后定义任务类型的编译时列表并通过模板元编程访问它 (for_each)。它会更快,因为您不需要任何虚拟调用 fnptr / std::function<>
来实现这一点。
这仅在您的元组代码看到所有任务 类 时才有效(因此您不能通过从光盘加载图像来将任务的新后代添加到已经 运行 的可执行文件中- 希望这不是问题)。
template<typename D> // CRTP on D
class Task {
public:
// you might want to static_assert at some point that D is in TaskTypeList
Task() : it_(tasks_.end()) {} // call enqueue() in descendant
~Task() {
// add your favorite lock here
if (queued()) {
tasks_.erase(it_);
}
}
bool queued() const { return it_ != tasks_.end(); }
static size_t ExecNext() {
if (!tasks_.empty()) {
// add your favorite lock here
auto&& itTask = tasks_.begin();
tasks_.pop_front();
// release lock
(*itTask)();
itTask->it_ = tasks_.end();
}
return tasks_.size();
}
protected:
void enqueue() const
{
// add your favorite lock here
tasks_.push_back(static_cast<D*>(this));
it_ = tasks_.rbegin();
}
private:
std::list<D*>::iterator it_;
static std::list<D*> tasks_; // you can have one per thread, too - then you don't need locking, but tasks are assigned to threads statically
};
struct MyTask : Task<MyTask> {
MyTask() { enqueue(); } // call enqueue only when the class is ready
void operator()() { /* add task here */ }
// ...
};
struct MyTask2; // etc.
template<typename...>
struct list_ {};
using TaskTypeList = list_<MyTask, MyTask2>;
void thread_pocess(list_<>) {}
template<typename TaskType, typename... TaskTypes>
void thread_pocess(list_<TaskType, TaskTypes...>)
{
TaskType::ExecNext();
thread_process(list_<TaskTypes...>());
}
void thread_process(void*)
{
for (;;) {
thread_process(TaskTypeList());
}
}
这段代码有很多需要调整的地方:不同的线程应该从队列的不同部分开始(或者一个人会使用一个环,或者几个队列,或者 static/dynamic 分配给线程),你会当绝对没有任务时让它进入睡眠状态,可以有一个任务枚举等。
请注意,这不能与任意 lambda 一起使用:您需要列出任务类型。您需要 'communicate' 声明它的函数中的 lambda 类型(例如,通过返回 `std::make_pair(retval, list_) 有时这并不容易。但是,您始终可以转换lambda 到函子,这很简单 - 只是丑陋。
我有一个线程池,用于执行许多小作业(数百万个作业,每个 dozens/hundreds 毫秒)。作业以以下任一形式传递:
std::bind(&fn, arg1, arg2, arg3...)
或
[&](){fn(arg1, arg2, arg3...);}
线程池像这样处理它们:
std::queue<std::function<void(void)>> queue;
void addJob(std::function<void(void)> fn)
{
queue.emplace_back(std::move(fn));
}
相当标准的东西....除了我注意到一个瓶颈,如果作业在足够快的时间内(小于一毫秒)执行,从 lambda/binder 到 std::function 的转换在 addJob 函数中实际上比执行作业本身花费的时间更长。读了一些书后,std::function 出了名的慢,所以我的瓶颈并不一定是意外的。
有没有更快的方法来做这类事情?我研究了插入式 std::function 替代品,但它们要么与我的编译器不兼容,要么速度不快。我还研究了 Don Clugston 的 "fast delegates",但他们似乎不允许将参数与函数一起传递(也许我没有正确理解它们?)。
我正在用 VS2015u3 编译,传递给作业的函数都是静态的,它们的参数是 ints/floats 或指向其他对象的指针。
每个任务类型都有一个单独的队列 - 您可能没有数以万计的任务类型。这些中的每一个都可以是例如你的任务的静态成员。然后 addJob()
实际上是 Task 的构造函数,它是完全类型安全的。
然后定义任务类型的编译时列表并通过模板元编程访问它 (for_each)。它会更快,因为您不需要任何虚拟调用 fnptr / std::function<>
来实现这一点。
这仅在您的元组代码看到所有任务 类 时才有效(因此您不能通过从光盘加载图像来将任务的新后代添加到已经 运行 的可执行文件中- 希望这不是问题)。
template<typename D> // CRTP on D
class Task {
public:
// you might want to static_assert at some point that D is in TaskTypeList
Task() : it_(tasks_.end()) {} // call enqueue() in descendant
~Task() {
// add your favorite lock here
if (queued()) {
tasks_.erase(it_);
}
}
bool queued() const { return it_ != tasks_.end(); }
static size_t ExecNext() {
if (!tasks_.empty()) {
// add your favorite lock here
auto&& itTask = tasks_.begin();
tasks_.pop_front();
// release lock
(*itTask)();
itTask->it_ = tasks_.end();
}
return tasks_.size();
}
protected:
void enqueue() const
{
// add your favorite lock here
tasks_.push_back(static_cast<D*>(this));
it_ = tasks_.rbegin();
}
private:
std::list<D*>::iterator it_;
static std::list<D*> tasks_; // you can have one per thread, too - then you don't need locking, but tasks are assigned to threads statically
};
struct MyTask : Task<MyTask> {
MyTask() { enqueue(); } // call enqueue only when the class is ready
void operator()() { /* add task here */ }
// ...
};
struct MyTask2; // etc.
template<typename...>
struct list_ {};
using TaskTypeList = list_<MyTask, MyTask2>;
void thread_pocess(list_<>) {}
template<typename TaskType, typename... TaskTypes>
void thread_pocess(list_<TaskType, TaskTypes...>)
{
TaskType::ExecNext();
thread_process(list_<TaskTypes...>());
}
void thread_process(void*)
{
for (;;) {
thread_process(TaskTypeList());
}
}
这段代码有很多需要调整的地方:不同的线程应该从队列的不同部分开始(或者一个人会使用一个环,或者几个队列,或者 static/dynamic 分配给线程),你会当绝对没有任务时让它进入睡眠状态,可以有一个任务枚举等。
请注意,这不能与任意 lambda 一起使用:您需要列出任务类型。您需要 'communicate' 声明它的函数中的 lambda 类型(例如,通过返回 `std::make_pair(retval, list_) 有时这并不容易。但是,您始终可以转换lambda 到函子,这很简单 - 只是丑陋。