改进的生产者消费者算法
modified producer consumer algorithm
我正在修改 producer/consumer 问题。然而,存在竞争条件,我正在讨论解决这个问题的最佳方法。可能有更简洁的方法,我想知道是否有人做过类似的事情,如果可能的话,分享更好的解决方案。
它使用队列正常启动 producer/consumer。一个生产者线程从磁盘读取项目并在共享队列中排队。然后多个消费者线程尝试使项目出列以进行处理。但是,每个项目都有一个必须与消费者标签匹配的标签(如线程 ID)。消费者线程查看队列的前端并检查项目的标签。如果它与消费者线程的标签不匹配,消费者必须进入休眠状态并等待,直到队列的前端有与其标签匹配的项目。有点混乱,但下面的伪代码有望阐明算法:
struct item
{
// This is unique tag that only a specific consumer can consumer
int consumerTag;
// data for the consumer to consume
void *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
lock(queue)
if (queueNotFull)
{
enqueue(item);
}
else
{
// check front of the queue, notify worker.
Sleep(); // Releases Queue Mutex upon entering
// requires the mutex after it has been awaken
}
unlock(queue);
wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
lock (queue);
if (queueNotEmpty)
{
item = queueFront()
if (myTag == item->id)
{
// this item is for me, let's dequeue and process
item = dequeue();
process();
}
else
{
// This is not for me let's go to sleep
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
}
else
{
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
unlock (queue);
wakeUpProducer();
}
但是上面的算法存在问题。让我们考虑以下事件并假设:
item.tag=1表示该商品只能由具有相同标签的消费者消费。我将其表示为 consumer.tag = 1
- 生产者读取
item.tag=1
并入队
- 生产者唤醒所有消费者线程(
consumer.tag=1
、consumer.tag=2
等...现在都醒了并检查队列的前端)
- 生产者读取
item.tag=2
并入队
- 生产者唤醒所有消费者线程
- 队列现在有
[item.tag=1, item.tag=2]
consumer.tag=2 wakes up and peek at the front of the queue, but
item.tag=1 与 consumer.tag=1
不匹配;因此,它进入睡眠状态。 consumer.tag=2
正在睡觉。
consumer.tag=1
醒来并查看队列的前面,item.tag=1
匹配 consumer.tag=1
。出列并通知生产者它可以消耗更多。
- 生产者读完数据退出。现在队列有
item.tag=2
并且 consumer.tag=2
正在休眠并且从不使用该数据。请注意,可以有很多消费者。所以最后很多消费者可以睡觉和队列
我想在生产者线程的末尾添加一个循环,不断唤醒所有休眠线程,直到队列为空。
// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
WakeUpAllConsumer();
Sleep();
}
但我相信一定有更优雅的方式来处理这个问题。任何想法让我知道
谢谢!
我前一段时间遇到了类似的事情(在所有线程都可以处理所有项目的设置中),以及我在那里使用的解决方案,尽管不是那么优雅是最后一次唤醒所有人当生产者读完数据。
在这里,这不会真正起作用,因为如果您的队列中有第三个项目,那么该项目可能会落在后面。我建议的是以下两种方式之一:
每次消费者将项目出列时唤醒所有线程。这是我能想到的确保没有遗漏任何东西的唯一方法。 (只有isProducerFinishedReading == true
才能保存一个resources/time.
重新设计系统有 10 个队列,然后当一个项目被添加到队列 n
时,消费者线程 n
被唤醒。处理完元素后,它会再次检查队列以查找要处理的新项目。在任何情况下,生产者都应在读取完成后检查所有队列的长度并唤醒相应的线程。
希望对您有所帮助。
编辑:
- 每次线程完成工作时,它应该再次检查队列,如果那里的项目 "his",那么它就开始工作。如果一个线程可以唤醒其他线程,那么它应该唤醒相应的线程。
我正在修改 producer/consumer 问题。然而,存在竞争条件,我正在讨论解决这个问题的最佳方法。可能有更简洁的方法,我想知道是否有人做过类似的事情,如果可能的话,分享更好的解决方案。
它使用队列正常启动 producer/consumer。一个生产者线程从磁盘读取项目并在共享队列中排队。然后多个消费者线程尝试使项目出列以进行处理。但是,每个项目都有一个必须与消费者标签匹配的标签(如线程 ID)。消费者线程查看队列的前端并检查项目的标签。如果它与消费者线程的标签不匹配,消费者必须进入休眠状态并等待,直到队列的前端有与其标签匹配的项目。有点混乱,但下面的伪代码有望阐明算法:
struct item
{
// This is unique tag that only a specific consumer can consumer
int consumerTag;
// data for the consumer to consume
void *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
lock(queue)
if (queueNotFull)
{
enqueue(item);
}
else
{
// check front of the queue, notify worker.
Sleep(); // Releases Queue Mutex upon entering
// requires the mutex after it has been awaken
}
unlock(queue);
wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
lock (queue);
if (queueNotEmpty)
{
item = queueFront()
if (myTag == item->id)
{
// this item is for me, let's dequeue and process
item = dequeue();
process();
}
else
{
// This is not for me let's go to sleep
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
}
else
{
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
unlock (queue);
wakeUpProducer();
}
但是上面的算法存在问题。让我们考虑以下事件并假设:
item.tag=1表示该商品只能由具有相同标签的消费者消费。我将其表示为 consumer.tag = 1
- 生产者读取
item.tag=1
并入队 - 生产者唤醒所有消费者线程(
consumer.tag=1
、consumer.tag=2
等...现在都醒了并检查队列的前端) - 生产者读取
item.tag=2
并入队 - 生产者唤醒所有消费者线程
- 队列现在有
[item.tag=1, item.tag=2]
consumer.tag=2 wakes up and peek at the front of the queue, but
item.tag=1 与consumer.tag=1
不匹配;因此,它进入睡眠状态。consumer.tag=2
正在睡觉。consumer.tag=1
醒来并查看队列的前面,item.tag=1
匹配consumer.tag=1
。出列并通知生产者它可以消耗更多。- 生产者读完数据退出。现在队列有
item.tag=2
并且consumer.tag=2
正在休眠并且从不使用该数据。请注意,可以有很多消费者。所以最后很多消费者可以睡觉和队列
我想在生产者线程的末尾添加一个循环,不断唤醒所有休眠线程,直到队列为空。
// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
WakeUpAllConsumer();
Sleep();
}
但我相信一定有更优雅的方式来处理这个问题。任何想法让我知道
谢谢!
我前一段时间遇到了类似的事情(在所有线程都可以处理所有项目的设置中),以及我在那里使用的解决方案,尽管不是那么优雅是最后一次唤醒所有人当生产者读完数据。
在这里,这不会真正起作用,因为如果您的队列中有第三个项目,那么该项目可能会落在后面。我建议的是以下两种方式之一:
每次消费者将项目出列时唤醒所有线程。这是我能想到的确保没有遗漏任何东西的唯一方法。 (只有
isProducerFinishedReading == true
才能保存一个resources/time.重新设计系统有 10 个队列,然后当一个项目被添加到队列
n
时,消费者线程n
被唤醒。处理完元素后,它会再次检查队列以查找要处理的新项目。在任何情况下,生产者都应在读取完成后检查所有队列的长度并唤醒相应的线程。
希望对您有所帮助。
编辑:
- 每次线程完成工作时,它应该再次检查队列,如果那里的项目 "his",那么它就开始工作。如果一个线程可以唤醒其他线程,那么它应该唤醒相应的线程。