C++ 创建一个作业容器,你也可以添加函数,它将在一个线程中启动,然后在完成后被删除

C++ Create a job container that you can add functions too, that will start in a thread, and then be removed once it is done

我想创建一个可以将函数推送到其中的容器,该容器将立即在线程中启动。一旦功能完成,它应该自动从容器中移除,这样容器就不会无限增长。

这是我目前的尝试:

#include <thread>
#include <future>
#include <iostream>
#include <map>

class j_thread {
    std::thread thread;
public:

    j_thread() {}

    template<typename F>
    j_thread(const F& f) : thread(f) {}

    j_thread& operator = (j_thread&& other) {
        this->thread.swap(other.thread);
        return *this;
    }

    virtual ~j_thread() {
        thread.join();
    }
};

class jobs {

    std::map<size_t, j_thread> threads;

public:

    template<typename F>
    void add_job(const F &function) {

        size_t job_id = threads.size();

        auto wrapped_function = [&function, job_id, this]() {
            function();
            threads.erase(job_id);
        };

        threads[job_id] = j_thread(wrapped_function);
    }

    void wait_for_all() {
        while(threads.size() != 0) {}
    }
};


int main() {

    jobs j;
    j.add_job([](){std::cout << "hello" << std::endl;});
    j.add_job([](){std::cout << "world" << std::endl;});
    j.wait_for_all();
}

但是当运行报错时:

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
hello
terminate called recursively
12:15:44: The program has unexpectedly finished.

在线程的 body 中调用 join 是未定义的行为。

查看 join 的错误条件:

Error Conditions resource_deadlock_would_occur if this->get_id() == std::this_thread::get_id() (deadlock detected)

您的 body 是:

    auto wrapped_function = [&function, job_id, this]() {
        function();
        threads.erase(job_id);
    };

在您调用 erase 的地方,正在调用 jthread 的 dtor,它在可连接的线程上调用 join

而不是 join,在 dtor 中你应该调用 detach.

为了避免悬空引用 function 必须按值捕获。

当调用 sizeerase:

时,您还必须添加一些互斥锁以避免地图上的数据竞争
std::mutex m;

int size() {
    std::lock_guard<std::mutex> lock{m};
    return threads.size();
}

auto wrapped_function = [f = function, job_id, this]() {
    f();
    std::lock_guard<std::mutex> l(m);
    threads.erase(job_id);
};

void wait_for_all() {
    while(size() != 0) {}
}

Demo