无法正确停止线程池

Cannot correctly stop thread pool

这是我的 ThreadPool 实现。我已经在简单函数 main 中尝试过它,但无法正确停止它,在线程启动之前调用析构函数并且整个程序在死锁中完成(在 t.join() 上),因为在线程到达之前调用了条件变量 wait函数。

有什么解决办法吗?或者有更好的实现方式?

ThreadPool.cpp

#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>


namespace Concurrency {
template <typename RetType>
class StandardThreadPool : public ThreadPool<RetType> {
private:
  typedef std::function<RetType()> taskType;
  ThreadSafeQueue<std::packaged_task<RetType()>> queue;

  std::mutex queueMutex;
  std::condition_variable queueCondition;

  std::vector<std::thread> poolThreads;
  std::atomic<bool> stopThreadsFlag{false};

  void threadWork() {
    std::cout << "thread:" << std::this_thread::get_id() << " started\n";
    std::unique_lock<std::mutex> lock(queueMutex);
    while (true) {
      queueCondition.wait(lock);

      if (stopThreadsFlag.load())
        break;

      auto task = queue.Pop();

      if (task)
        (*task)();
    }
    std::cout << "thread:" << std::this_thread::get_id() << " finished\n";
  }

  void initThreadPool() {
    poolThreads.resize(StandardThreadPool<RetType>::maxThreads);
  }

  void startThreads() {
    for (int i = 0; i < StandardThreadPool<RetType>::maxThreads; i++) {
      poolThreads[i] =
          std::thread(&StandardThreadPool<RetType>::threadWork, this);
    }
  }

  void terminateThreads() {
    stopThreadsFlag.store(true);
    queueCondition.notify_all();

    for (auto &t : poolThreads) {
      t.join();
    }
  }

public:
  StandardThreadPool(int maxThreads) : ThreadPool<RetType>(maxThreads) {
    initThreadPool();
    startThreads();
  }

  std::future<RetType> virtual Push(taskType &&task) override {
    std::packaged_task<RetType()> pt = std::packaged_task<RetType()>(task);
    auto future = pt.get_future();
    queue.Push(std::move(pt));

    queueCondition.notify_one();
    return future;
  }

  ~StandardThreadPool<RetType>() {
    std::cout << "destructor called\n";
    terminateThreads(); }
};

} // namespace Concurrency


namespace Concurrency {
template <typename T> class ThreadSafeQueue {
private:
  struct node {
    std::shared_ptr<T> data;
    std::unique_ptr<node> next;
  };

  std::mutex headMutex;
  std::mutex tailMutex;

  std::unique_ptr<node> head;
  node *tail;

  node *getTail() {
    std::lock_guard<std::mutex> lock(tailMutex);
    return tail;
  }

  std::unique_ptr<node> popHead() {
    std::lock_guard<std::mutex> lock(headMutex);

    if (head.get() == getTail())
      return nullptr;

    std::unique_ptr<node> oldHead(std::move(head));
    head = std::move(oldHead->next);

    return oldHead;
  }

public:
  ThreadSafeQueue() : head(new node), tail(head.get()) {}

  std::shared_ptr<T> Pop() {
    auto oldHead = popHead();

    return oldHead ? oldHead->data : nullptr;
  }

  void Push(T &newValue) {
    auto newData = std::make_shared<T>(std::forward<T>(newValue));
    std::unique_ptr<node> pNew(new node);

    auto newTail = pNew.get();

    std::lock_guard<std::mutex> lock(tailMutex);
    tail->data = newData;
    tail->next = std::move(pNew);

    tail = newTail;
  }

  void Push(T &&newValue) {
    auto newData = std::make_shared<T>(std::move(newValue));
    std::unique_ptr<node> pNew(new node);

    auto newTail = pNew.get();

    std::lock_guard<std::mutex> lock(tailMutex);
    tail->data = newData;
    tail->next = std::move(pNew);

    tail = newTail;
  }

  ThreadSafeQueue(const ThreadSafeQueue &) = delete;
  ThreadSafeQueue &operator=(const ThreadSafeQueue &) = delete;
};
} // namespace Concurrency
#include <functional>
#include <future>

namespace Concurrency {

template <typename RetType> class ThreadPool {
public:
  int maxThreads;

public:
  typedef std::function<RetType()> taskType;
  ThreadPool(int maxThreads):maxThreads(maxThreads){}

  virtual std::future<RetType> Push(taskType &&newTask) = 0;

  ThreadPool(const ThreadPool &) = delete;
  ThreadPool(const ThreadPool &&) = delete;
};
} // namespace Concurrency

main.cpp

int main() {
  Concurrency::StandardThreadPool<int> th(1);
  auto fun = []() {
    std::cout << "function running\n";
    return 2;
  };

  th.Push(fun);

  return EXIT_SUCCESS;
}

首先,一个正确的线程安全队列。

template<class T>
struct threadsafe_queue {
  [[nodiscard]] std::optional<T> pop() {
    auto l = lock();
    cv.wait(l, [&]{ return is_aborted() || !data.empty(); });
    if (is_aborted())
      return {};
    auto r = std::move(data.front());
    data.pop_front();
    cv.notify_all(); // for wait_until_empty
    return r; // might need std::move here, depending on compiler version
  }
  bool push(T t) {
    auto l = lock();
    if (is_aborted()) return false;
    data.push_back(std::move(t));
    cv.notify_one();
    return true;
  }
  void set_abort_flag() {
    auto l = lock(); // still need this
    aborted = true;
    data.clear();
    cv.notify_all();
  }
  [[nodiscard]] bool is_aborted() const { return aborted; }
  void wait_until_empty() {
    auto l = lock();
    cv.wait(l, [&]{ return data.empty(); });
  }
private:
  std::unique_lock<std::mutex> lock() {
    return std::unique_lock<std::mutex>(m);
  }
  std::condition_variable cv;
  std::mutex m;
  std::atomic<bool> aborted{false};
  std::deque<T> data;
};

这在内部处理中止等。

然后我们的线程池变成:

struct threadpool {
  explicit threadpool(std::size_t count)
  {
    for (std::size_t i = 0; i < count; ++i) {
      threads.emplace_back([&]{
        // abort handled by empty pop:
        while( auto f = queue.pop() ) {
          (*f)();
        }
      });
    }
  }
  void set_abort_flag() {
    queue.set_abort_flag();
  }
  [[nodiscard]] bool is_aborted() const {
    return queue.is_aborted();
  }
  ~threadpool() {
    queue.wait_until_empty();
    queue.set_abort_flag(); // get threads to leave the queue
    for (std::thread& t:threads)
      t.join();
  }
  template<class F,
    class R=typename std::result_of<F()>::type
  >
  std::future<R> push_task( F f ) {
    std::packaged_task<R()> task( std::move(f) );
    auto ret = task.get_future();
    if (queue.push( std::packaged_task<void()>(std::move(task)) )) // wait, this works?  Yes it does.
      return ret;
    else
      return {}; // cannot push, already aborted
  }
private:
  // yes, void.  This is evil but it works
  threadsafe_queue<std::packaged_task<void()>> queue;
  std::vector<std::thread> threads;
};

中,您可以将 std::optional 换成 std::unique_ptr。更多的运行时开销。

这里的技巧是 std::packaged_task<void()> 可以存储 std::packaged_task<R()>。我们不需要队列中的 return 值。因此,一个线程池可以处理任务中任意数量的不同 return 值——它并不关心。

我只在 thread_pool 销毁时加入线程。我也可以在中止后执行此操作。

销毁 thread_pool 等待所有任务完成。请注意,中止 thread_pool 可能不会中止正在进行的任务。您可能想要添加的一件事是将中止 API/flag 传递给任务的选项,因此如果被要求它们可以提前中止。

获得这种工业规模很难,因为理想情况下,任务中的所有阻塞也会注意中止的可能性。

Live example.

您可以添加第二个 cv 以在弹出后通知,只有 wait_until_empty 等待。这可能会让你避免一些虚假的唤醒。