c++ 线程池:替代 std::function 将 functions/lambdas 传递给线程?

c++ thread pool: alternative to std::function for passing functions/lambdas to threads?

我有一个线程池,用于执行许多小作业(数百万个作业,每个 dozens/hundreds 毫秒)。作业以以下任一形式传递:

std::bind(&fn, arg1, arg2, arg3...)

[&](){fn(arg1, arg2, arg3...);}

线程池像这样处理它们:

std::queue<std::function<void(void)>> queue;

void addJob(std::function<void(void)> fn)
{
    queue.emplace_back(std::move(fn));
}

相当标准的东西....除了我注意到一个瓶颈,如果作业在足够快的时间内(小于一毫秒)执行,从 lambda/binder 到 std::function 的转换在 addJob 函数中实际上比执行作业本身花费的时间更长。读了一些书后,std::function 出了名的慢,所以我的瓶颈并不一定是意外的。

有没有更快的方法来做这类事情?我研究了插入式 std::function 替代品,但它们要么与我的编译器不兼容,要么速度不快。我还研究了 Don Clugston 的 "fast delegates",但他们似乎不允许将参数与函数一起传递(也许我没有正确理解它们?)。

我正在用 VS2015u3 编译,传递给作业的函数都是静态的,它们的参数是 ints/floats 或指向其他对象的指针。

每个任务类型都有一个单独的队列 - 您可能没有数以万计的任务类型。这些中的每一个都可以是例如你的任务的静态成员。然后 addJob() 实际上是 Task 的构造函数,它是完全类型安全的。

然后定义任务类型的编译时列表并通过模板元编程访问它 (for_each)。它会更快,因为您不需要任何虚拟调用 fnptr / std::function<> 来实现这一点。

这仅在您的元组代码看到所有任务 类 时才有效(因此您不能通过从光盘加载图像来将任务的新后代添加到已经 运行 的可执行文件中- 希望这不是问题)。

template<typename D> // CRTP on D
class Task {
public:
    // you might want to static_assert at some point that D is in TaskTypeList

    Task() : it_(tasks_.end()) {} // call enqueue() in descendant

    ~Task() {
        // add your favorite lock here
        if (queued()) {
            tasks_.erase(it_);
        }
    }

    bool queued() const { return it_ != tasks_.end(); }

    static size_t ExecNext() {
        if (!tasks_.empty()) {
            // add your favorite lock here
            auto&& itTask = tasks_.begin();
            tasks_.pop_front();
            // release lock
            (*itTask)();
            itTask->it_ = tasks_.end();
        }
        return tasks_.size();
    }

protected:
    void enqueue() const
    {
        // add your favorite lock here
        tasks_.push_back(static_cast<D*>(this));
        it_ = tasks_.rbegin();
    }

private:
    std::list<D*>::iterator it_;

    static std::list<D*> tasks_; // you can have one per thread, too - then you don't need locking, but tasks are assigned to threads statically
};

struct MyTask : Task<MyTask> {
    MyTask() { enqueue(); } // call enqueue only when the class is ready
    void operator()() { /* add task here */ }
    // ...
};

struct MyTask2; // etc.

template<typename...>
struct list_ {};

using TaskTypeList = list_<MyTask, MyTask2>;

void thread_pocess(list_<>) {}

template<typename TaskType, typename... TaskTypes>
void thread_pocess(list_<TaskType, TaskTypes...>)
{
    TaskType::ExecNext();
    thread_process(list_<TaskTypes...>());
}

void thread_process(void*)
{
    for (;;) {
        thread_process(TaskTypeList());
    }
}

这段代码有很多需要调整的地方:不同的线程应该从队列的不同部分开始(或者一个人会使用一个环,或者几个队列,或者 static/dynamic 分配给线程),你会当绝对没有任务时让它进入睡眠状态,可以有一个任务枚举等。

请注意,这不能与任意 lambda 一起使用:您需要列出任务类型。您需要 'communicate' 声明它的函数中的 lambda 类型(例如,通过返回 `std::make_pair(retval, list_) 有时这并不容易。但是,您始终可以转换lambda 到函子,这很简单 - 只是丑陋。