将工作线程与主线程同步
Synchronize worker threads with a main thread
如果工作线程可以生成其他任务,如何正确同步工作线程与主线程?我使用 std::queue 来维护由互斥锁和原子变量保护的任务,以跟踪繁忙的线程。不幸的是,我在执行结束时遇到了死锁。
我从我的项目中提取了代码并创建了以下示例(您可以使用 g++ 或 MSVC 轻松编译它):
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
#include <functional>
#include <stack>
#include <atomic>
#include <queue>
template <class T, class Compare>
class USort {
using Task = std::pair<T*, T*>;
private:
size_t m_ThreadsNum;
std::atomic<bool> m_Finished;
std::atomic<size_t> m_Busy;
std::thread* m_Threads;
std::queue<Task> m_Tasks;
size_t m_Size;
T* m_Data;
Compare m_Comparator;
std::condition_variable m_WaitFinished;
std::condition_variable m_WaitSorter;
std::mutex m_TaskQueueMutex;
private:
const size_t THREAD_THRESHOLD = 1024;
const size_t THREAD_POOL_THRESHOLD = 8192;
bool HasTask() {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
return m_Tasks.size() > 0;
}
bool PopTask(T** L, T** R) {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
if (m_Tasks.size() == 0) {
*L = *R = nullptr;
return false;
}
*L = m_Tasks.front().first;
*R = m_Tasks.front().second;
m_Tasks.pop();
return true;
}
void PushTask(T* L, T* R) {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
m_Tasks.emplace(std::pair<T*, T*>(L, R));
m_WaitSorter.notify_one();
}
void SortThread(size_t Id) {
std::mutex sorter_mutex;
for (;;) {
std::unique_lock<std::mutex> lock(sorter_mutex);
///
/// ----------------------------------> some threads wait here
///
m_WaitSorter.wait(lock, [this]() { return m_Finished || HasTask(); });
if (m_Finished) break;
m_Busy++;
T *left, *right;
while (PopTask(&left, &right)) {
Sort(left, right);
}
if (--m_Busy == 0) {
m_WaitFinished.notify_one();
}
}
}
// just simulate work
void Sort(T* Left, T* Right) {
if (Right - Left > 10) {
PushTask(Left, Right-10);
}
}
void WaitForSortingIsFinished() {
std::mutex finished;
std::unique_lock<std::mutex> lock(finished);
m_WaitFinished.wait(lock, [this]() { return m_Busy == 0 && !HasTask(); });
}
void FinishThreads() {
m_Finished = true;
m_WaitSorter.notify_all();
}
void ReleaseThreads() {
if (m_Threads) {
for (size_t i = 0; i < m_ThreadsNum; i++) {
///
/// ----------------------------------> main thread stuck here
///
m_Threads[i].join();
}
delete[] m_Threads;
m_Threads = nullptr;
}
}
public:
USort(size_t NumberOfThreads = 0) : m_Comparator(Compare()) {
if (NumberOfThreads == 0) {
static const unsigned int max_concurrency = std::thread::hardware_concurrency();
NumberOfThreads = max_concurrency;
if (NumberOfThreads == 0) NumberOfThreads = 4;
}
m_Finished = false;
m_ThreadsNum = NumberOfThreads;
m_Threads = nullptr;
}
~USort() {
ReleaseThreads();
}
void Sort(T* Data, size_t Size) {
// build thread pool
m_Threads = new std::thread[m_ThreadsNum];
for (size_t i = 0; i < m_ThreadsNum; i++) {
m_Threads[i] = std::thread(&USort::SortThread, this, i);
}
// process data
PushTask(Data, Data + Size - 1);
WaitForSortingIsFinished();
FinishThreads();
}
};
template <class T, class Compare>
void usort(T* Data, size_t Size, size_t NumberOfThreads = 0) {
USort<T, Compare> mt_sorter(NumberOfThreads);
mt_sorter.Sort(Data, Size);
}
const size_t ARR_SIZE = 0x00010000;
struct comp {
bool operator()(const int& L, const int& R) const {
return L < R;
}
};
int main()
{
int* arr = new int[ARR_SIZE];
for (int i = 0; i < ARR_SIZE; i++) {
arr[i] = rand() % 3200000;
}
usort<int, comp>(arr, ARR_SIZE, 16);
delete[] arr;
return 0;
}
事实是,在我的示例中线程并不总是完成。有时一些线程在 m_WaitSorter.wait()
中挂起,因此主线程在 m_Threads[i].join();
中挂起。逻辑上的漏洞在哪里。为什么调用 FinishThreads()
没有完成所有线程?
编辑:
基本上我想实现多线程排序算法。
- 主线程创建线程池,将第一个任务(排序整个数组)推入任务队列等待排序完成
- 池线程接受任务,将其划分为更小的任务(1-3)。其中一项任务由当前池线程立即处理,其他任务被推送到队列
- 池线程必须在整个数据集排序完成后才能完成(队列中没有任务,所有池线程都处于待处理状态)
- 排序完成后主线程应该被唤醒
- 主线程应该完成挂起的线程
因此,从我的角度来看,我需要两个 conditional_variabes 在主线程中使用谓词“所有线程都在等待&&队列中没有任务”和“在队列中有任务 || 完成线程”池线程。
好的,我仔细阅读了文档并发现我的代码中存在错误。必须通过相同的互斥体来控制对 notify_one()
、notify_all()
和 wait()
的调用。考虑到这一点,我已经更新并稍微简化了我的代码:
bool WaitAndPopTask(T** L, T** R) {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
m_WaitSorter.wait(lock, [this]() { return m_Finished || !m_Tasks.empty(); });
if (m_Finished) return false;
m_Busy++;
*L = m_Tasks.front().first;
*R = m_Tasks.front().second;
m_Tasks.pop();
return true;
}
void SortThread(size_t Id) {
for (;;) {
T *left, *right;
if (!WaitAndPopTask(&left, &right)) break;
Sort(left, right);
std::lock_guard<std::mutex> lk(m_TaskQueueMutex);
if (--m_Busy == 0 && m_Tasks.empty()) {
FinishThreads();
}
}
}
void Sort(T* Data, size_t Size) {
// build thread pool
m_Threads = new std::thread[m_ThreadsNum];
for (size_t i = 0; i < m_ThreadsNum; i++) {
m_Threads[i] = std::thread(&USort::SortThread, this, i);
}
// process data
PushTask(Data, Data + Size - 1);
ReleaseThreads();
}
如果工作线程可以生成其他任务,如何正确同步工作线程与主线程?我使用 std::queue 来维护由互斥锁和原子变量保护的任务,以跟踪繁忙的线程。不幸的是,我在执行结束时遇到了死锁。
我从我的项目中提取了代码并创建了以下示例(您可以使用 g++ 或 MSVC 轻松编译它):
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
#include <functional>
#include <stack>
#include <atomic>
#include <queue>
template <class T, class Compare>
class USort {
using Task = std::pair<T*, T*>;
private:
size_t m_ThreadsNum;
std::atomic<bool> m_Finished;
std::atomic<size_t> m_Busy;
std::thread* m_Threads;
std::queue<Task> m_Tasks;
size_t m_Size;
T* m_Data;
Compare m_Comparator;
std::condition_variable m_WaitFinished;
std::condition_variable m_WaitSorter;
std::mutex m_TaskQueueMutex;
private:
const size_t THREAD_THRESHOLD = 1024;
const size_t THREAD_POOL_THRESHOLD = 8192;
bool HasTask() {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
return m_Tasks.size() > 0;
}
bool PopTask(T** L, T** R) {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
if (m_Tasks.size() == 0) {
*L = *R = nullptr;
return false;
}
*L = m_Tasks.front().first;
*R = m_Tasks.front().second;
m_Tasks.pop();
return true;
}
void PushTask(T* L, T* R) {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
m_Tasks.emplace(std::pair<T*, T*>(L, R));
m_WaitSorter.notify_one();
}
void SortThread(size_t Id) {
std::mutex sorter_mutex;
for (;;) {
std::unique_lock<std::mutex> lock(sorter_mutex);
///
/// ----------------------------------> some threads wait here
///
m_WaitSorter.wait(lock, [this]() { return m_Finished || HasTask(); });
if (m_Finished) break;
m_Busy++;
T *left, *right;
while (PopTask(&left, &right)) {
Sort(left, right);
}
if (--m_Busy == 0) {
m_WaitFinished.notify_one();
}
}
}
// just simulate work
void Sort(T* Left, T* Right) {
if (Right - Left > 10) {
PushTask(Left, Right-10);
}
}
void WaitForSortingIsFinished() {
std::mutex finished;
std::unique_lock<std::mutex> lock(finished);
m_WaitFinished.wait(lock, [this]() { return m_Busy == 0 && !HasTask(); });
}
void FinishThreads() {
m_Finished = true;
m_WaitSorter.notify_all();
}
void ReleaseThreads() {
if (m_Threads) {
for (size_t i = 0; i < m_ThreadsNum; i++) {
///
/// ----------------------------------> main thread stuck here
///
m_Threads[i].join();
}
delete[] m_Threads;
m_Threads = nullptr;
}
}
public:
USort(size_t NumberOfThreads = 0) : m_Comparator(Compare()) {
if (NumberOfThreads == 0) {
static const unsigned int max_concurrency = std::thread::hardware_concurrency();
NumberOfThreads = max_concurrency;
if (NumberOfThreads == 0) NumberOfThreads = 4;
}
m_Finished = false;
m_ThreadsNum = NumberOfThreads;
m_Threads = nullptr;
}
~USort() {
ReleaseThreads();
}
void Sort(T* Data, size_t Size) {
// build thread pool
m_Threads = new std::thread[m_ThreadsNum];
for (size_t i = 0; i < m_ThreadsNum; i++) {
m_Threads[i] = std::thread(&USort::SortThread, this, i);
}
// process data
PushTask(Data, Data + Size - 1);
WaitForSortingIsFinished();
FinishThreads();
}
};
template <class T, class Compare>
void usort(T* Data, size_t Size, size_t NumberOfThreads = 0) {
USort<T, Compare> mt_sorter(NumberOfThreads);
mt_sorter.Sort(Data, Size);
}
const size_t ARR_SIZE = 0x00010000;
struct comp {
bool operator()(const int& L, const int& R) const {
return L < R;
}
};
int main()
{
int* arr = new int[ARR_SIZE];
for (int i = 0; i < ARR_SIZE; i++) {
arr[i] = rand() % 3200000;
}
usort<int, comp>(arr, ARR_SIZE, 16);
delete[] arr;
return 0;
}
事实是,在我的示例中线程并不总是完成。有时一些线程在 m_WaitSorter.wait()
中挂起,因此主线程在 m_Threads[i].join();
中挂起。逻辑上的漏洞在哪里。为什么调用 FinishThreads()
没有完成所有线程?
编辑: 基本上我想实现多线程排序算法。
- 主线程创建线程池,将第一个任务(排序整个数组)推入任务队列等待排序完成
- 池线程接受任务,将其划分为更小的任务(1-3)。其中一项任务由当前池线程立即处理,其他任务被推送到队列
- 池线程必须在整个数据集排序完成后才能完成(队列中没有任务,所有池线程都处于待处理状态)
- 排序完成后主线程应该被唤醒
- 主线程应该完成挂起的线程
因此,从我的角度来看,我需要两个 conditional_variabes 在主线程中使用谓词“所有线程都在等待&&队列中没有任务”和“在队列中有任务 || 完成线程”池线程。
好的,我仔细阅读了文档并发现我的代码中存在错误。必须通过相同的互斥体来控制对 notify_one()
、notify_all()
和 wait()
的调用。考虑到这一点,我已经更新并稍微简化了我的代码:
bool WaitAndPopTask(T** L, T** R) {
std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
m_WaitSorter.wait(lock, [this]() { return m_Finished || !m_Tasks.empty(); });
if (m_Finished) return false;
m_Busy++;
*L = m_Tasks.front().first;
*R = m_Tasks.front().second;
m_Tasks.pop();
return true;
}
void SortThread(size_t Id) {
for (;;) {
T *left, *right;
if (!WaitAndPopTask(&left, &right)) break;
Sort(left, right);
std::lock_guard<std::mutex> lk(m_TaskQueueMutex);
if (--m_Busy == 0 && m_Tasks.empty()) {
FinishThreads();
}
}
}
void Sort(T* Data, size_t Size) {
// build thread pool
m_Threads = new std::thread[m_ThreadsNum];
for (size_t i = 0; i < m_ThreadsNum; i++) {
m_Threads[i] = std::thread(&USort::SortThread, this, i);
}
// process data
PushTask(Data, Data + Size - 1);
ReleaseThreads();
}