线程安全的 FIFO / Queue(多个生产者,一个消费者)
Thread safe FIFO / Queue (Multiple producers, one consumer)
正如标题所说,我正在尝试编写一个 queue 可以由多个线程写入并由一个线程读取。作为一个额外的困难,我需要 queue 输入保持有序(先进先出)。这就是我迷路的地方。互斥量不一定按照它们被锁定的顺序被唤醒,所以我不知道我可以使用什么来实现我想要的?这是一个简单的程序,说明了我正在尝试做的事情:
#include "Queue.h"
#include <Windows.h>
#include <fstream>
#include <mutex>
using std::ofstream;
ofstream myFile("result.txt");
Queue<int> myQueue;
DWORD WINAPI WritingThread(LPVOID lpParam);
DWORD WINAPI LockingThread(LPVOID lpParam);
int main()
{
// This thread will block myQueue for 3 seconds
CreateThread(NULL, 0, LockingThread, NULL, 0, NULL);
// During the locked period, I ask myQueue to push numbers from 0 to 49
for (int i = 0; i < 50; i++)
CreateThread(NULL, 0, WritingThread, (LPVOID)new int(i), 0, NULL);
// If the mutex could wake up in order, myQueue would pop up the numbers in order, but it doesn't.
for (int i = 0; i < 50; i++)
myFile << myQueue.pop() << ",";
return EXIT_SUCCESS;
}
DWORD WINAPI LockingThread( LPVOID lpParam )
{
myQueue.lockQueueFor3Seconds();
return 0;
}
DWORD WINAPI WritingThread( LPVOID lpParam )
{
myQueue.push(*(int*)lpParam);
return 0;
}
class Queue 的代码已被采用 there, see the bottom of the article for full code. 我所做的只是添加方法 "lockQueueFor3Seconds" 用于测试目的。该方法定义如下:
void lockQueueFor3Seconds()
{
std::unique_lock<std::mutex> mlock(mutex_);
Sleep(3000);
}
该测试的输出如下所示:
1,43,39,46,36,44,49,40,35,42,32,31,28,41,27,38,24,23,20,34,19,16,15,12,37,11,7,8,3,33,30,0,45,4,26,18,48,21,47,22,25,17,14,10,6,29,9,2,13,5
如您所见,显然没有订购。感谢您的帮助!
编辑:我修改了 queue 以便它为每个推送调用分配一个数字来表示它们的顺序,当互斥体解锁时,queue 检查以确保轮到正确的方法在添加元素之前,否则它会返回等待。不确定我是否正确实施了这个,但它似乎有效!完整代码可见there.
给线程分配要添加的值并期望它们按顺序添加是行不通的,因为您无法强制线程执行的顺序。
相反,让每个线程在运行时添加下一个数字(无论它是什么)。像这样:
std::atomic_int counter;
DWORD WINAPI WritingThread( LPVOID lpParam )
{
myQueue.push( counter++ );
return 0;
}
编辑:增量是原子的是不够的。增量和推送到队列需要是单个原子操作。这意味着将锁定变量暴露在 class 之外(它已经是 public)。
std::atomic_int counter;
DWORD WINAPI WritingThread( LPVOID lpParam )
{
unique_lock<mutex> lock(myQueue.m_mutex);
myQueue.push( counter++ );
return 0;
}
如果您的 mutex
实现允许同一个线程多次调用它,那将起作用。否则,您可以执行类似的操作:
void pushAndIncrement(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
++item;
mlock.unlock();
cond_.notify_one();
}
我认为您的解决方案(您说有效)仍然存在竞争条件。如果在递增字母值之后但在递增 push
内的 counter
值之前有上下文切换,它将以错误的顺序添加字母。它是如此之小window,它可能不太可能发生,但如果你将计数器增量放在与推送相同的锁中,那么每次都会完美。
正如标题所说,我正在尝试编写一个 queue 可以由多个线程写入并由一个线程读取。作为一个额外的困难,我需要 queue 输入保持有序(先进先出)。这就是我迷路的地方。互斥量不一定按照它们被锁定的顺序被唤醒,所以我不知道我可以使用什么来实现我想要的?这是一个简单的程序,说明了我正在尝试做的事情:
#include "Queue.h"
#include <Windows.h>
#include <fstream>
#include <mutex>
using std::ofstream;
ofstream myFile("result.txt");
Queue<int> myQueue;
DWORD WINAPI WritingThread(LPVOID lpParam);
DWORD WINAPI LockingThread(LPVOID lpParam);
int main()
{
// This thread will block myQueue for 3 seconds
CreateThread(NULL, 0, LockingThread, NULL, 0, NULL);
// During the locked period, I ask myQueue to push numbers from 0 to 49
for (int i = 0; i < 50; i++)
CreateThread(NULL, 0, WritingThread, (LPVOID)new int(i), 0, NULL);
// If the mutex could wake up in order, myQueue would pop up the numbers in order, but it doesn't.
for (int i = 0; i < 50; i++)
myFile << myQueue.pop() << ",";
return EXIT_SUCCESS;
}
DWORD WINAPI LockingThread( LPVOID lpParam )
{
myQueue.lockQueueFor3Seconds();
return 0;
}
DWORD WINAPI WritingThread( LPVOID lpParam )
{
myQueue.push(*(int*)lpParam);
return 0;
}
class Queue 的代码已被采用 there, see the bottom of the article for full code. 我所做的只是添加方法 "lockQueueFor3Seconds" 用于测试目的。该方法定义如下:
void lockQueueFor3Seconds()
{
std::unique_lock<std::mutex> mlock(mutex_);
Sleep(3000);
}
该测试的输出如下所示:
1,43,39,46,36,44,49,40,35,42,32,31,28,41,27,38,24,23,20,34,19,16,15,12,37,11,7,8,3,33,30,0,45,4,26,18,48,21,47,22,25,17,14,10,6,29,9,2,13,5
如您所见,显然没有订购。感谢您的帮助!
编辑:我修改了 queue 以便它为每个推送调用分配一个数字来表示它们的顺序,当互斥体解锁时,queue 检查以确保轮到正确的方法在添加元素之前,否则它会返回等待。不确定我是否正确实施了这个,但它似乎有效!完整代码可见there.
给线程分配要添加的值并期望它们按顺序添加是行不通的,因为您无法强制线程执行的顺序。
相反,让每个线程在运行时添加下一个数字(无论它是什么)。像这样:
std::atomic_int counter;
DWORD WINAPI WritingThread( LPVOID lpParam )
{
myQueue.push( counter++ );
return 0;
}
编辑:增量是原子的是不够的。增量和推送到队列需要是单个原子操作。这意味着将锁定变量暴露在 class 之外(它已经是 public)。
std::atomic_int counter;
DWORD WINAPI WritingThread( LPVOID lpParam )
{
unique_lock<mutex> lock(myQueue.m_mutex);
myQueue.push( counter++ );
return 0;
}
如果您的 mutex
实现允许同一个线程多次调用它,那将起作用。否则,您可以执行类似的操作:
void pushAndIncrement(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
++item;
mlock.unlock();
cond_.notify_one();
}
我认为您的解决方案(您说有效)仍然存在竞争条件。如果在递增字母值之后但在递增 push
内的 counter
值之前有上下文切换,它将以错误的顺序添加字母。它是如此之小window,它可能不太可能发生,但如果你将计数器增量放在与推送相同的锁中,那么每次都会完美。