条件变量 + 互斥锁 + pthreads 的 C++ 生产者消费者问题
C++ Producer Consumer Problem with condition variable + mutex + pthreads
我需要用 C++ 解决生产者消费者问题,解决 1 个消费者和 6 个生产者,以及 1 个生产者和 6 个消费者,下面是问题的陈述。
问题一:
想象一下,您正在一家非常繁忙的餐厅里等一些朋友,您正在看着等待 table 的工作人员将食物从厨房端到他们的 table 上。这是经典的“生产者-消费者”问题的一个例子。服务器是有限的,厨房不断生产食物。考虑一下服务器(消费者)是有限的,食物是“无限”供应的由厨师(生产者)生产。
一种便于识别并因此减少为“生产者-消费者”问题的方法是限制消费者的数量,从而限制无限次用餐。
在厨房生产。因此,建议设置红绿灯,以控制服务员取餐的生产顺序。
程序类似于:
- 创建信号量;
- 创建服务器和主厨线程;
- 尽可能多地制作食物并记录食物的数量
正在排队;
- 服务器线程将 运行 直到它设法交付在
table秒;和
- 线程必须与主线程“连接”。
还要考虑有6个厨师和1个服务员。如果你愿意,你可以认为厨师做一顿饭需要 50 微秒,而服务员送餐需要 10 微秒。
饭就table。设置要服务的最大客户数。在屏幕上打印,在执行结束时,哪个厨师最闲和最少闲,以及每个厨师做了多少顿饭。
问题二:
考虑到上述餐厅。现在考虑有 1 名厨师和 6 名服务员。假设厨师用 50 微秒做一顿饭,服务员用 15 微秒把饭菜送到 table。设置要服务的最大客户数。
打印哪个服务器最闲和最不闲,每个服务器送了多少餐。
我设法解决了 6 个生产者和 1 个消费者的问题,但是对于 6 个消费者和 1 个生产者来说它不起作用,程序似乎陷入了一些死锁。如果有人知道如何提供帮助,我将不胜感激。
#include <iostream>
#include <random>
#include <chrono>
#include <thread>
#include <mutex>
#include <deque>
//The mutex class is a synchronization primitive that can be used to protect shared data
//from being simultaneously accessed by multiple threads.
std::mutex semaforo; //semafoto para fazer o controle de acesso do buffer
std::condition_variable notifica; //variavel condicional para fazer a notificação de prato consumido/consumido
std::deque<int> buffer; //buffer de inteiros
const unsigned int capacidade_buffer = 10; //tamanho do buffer
const unsigned int numero_pratos = 25; //numeros de pratos a serem produzidos
void produtor()
{
unsigned static int contador_pratos_produzidos = 0;
while (contador_pratos_produzidos < numero_pratos)
{
std::unique_lock<std::mutex> locker(semaforo);
notifica.wait(locker, []
{ return buffer.size() < capacidade_buffer; });
std::this_thread::sleep_for(std::chrono::microseconds(50));
buffer.push_back(contador_pratos_produzidos);
if (contador_pratos_produzidos < numero_pratos)
{
contador_pratos_produzidos++;
}
locker.unlock();
notifica.notify_all();
}
}
void consumidor(int ID, std::vector<int> &consumido)
{
unsigned static int contador_pratos_consumidos = 0;
while (contador_pratos_consumidos < numero_pratos)
{
std::unique_lock<std::mutex> locker(semaforo);
notifica.wait(locker, []
{ return buffer.size() > 0; });
std::this_thread::sleep_for(std::chrono::microseconds(15));
buffer.pop_front();
if (contador_pratos_consumidos < numero_pratos)
{
contador_pratos_consumidos++;
consumido[ID]++;
}
locker.unlock();
notifica.notify_one();
}
}
int main()
{
//vetor para contagem do consumo de cada garcon
std::vector<int> consumido(6, 0);
//vetor de threads garcon(consumidores)
std::vector<std::thread> consumidores;
for (int k = 0; k < 6; k++)
{
consumidores.push_back(std::thread(consumidor, k, std::ref(consumido)));
}
//produtor/chef
std::thread p1(produtor);
for (auto &k : consumidores)
{
k.join();
}
p1.join();
int mais_ocioso = 200, menos_ocioso = 0, mais, menos;
for (int k = 0; k < 6; k++)
{
std::cout << "Garcon " << k + 1 << " entregou " << consumido[k] << " pratos\n";
if (consumido[k] > menos_ocioso)
{
menos = k + 1;
menos_ocioso = consumido[k];
}
if (consumido[k] < mais_ocioso)
{
mais = k + 1;
mais_ocioso = consumido[k];
}
}
std::cout << "\nO mais ocioso foi o garcon " << mais << " e o menos ocioso foi o garcon " << menos << "\n";
}
消费者和生产者函数中都存在完全相同的错误。我将解释其中一个,同样的错误也必须在另一个中修复。
unsigned static int contador_pratos_consumidos = 0;
while (contador_pratos_consumidos < numero_pratos)
{
这个 static
计数器被多个执行线程访问和修改。
任何由多个执行线程使用的非原子对象都必须正确排序(只有在持有适当的互斥体时才能访问)。
如果您将注意力集中在上面的两行代码上,那么很明显可以在没有任何互斥锁保护的情况下访问该计数器。一旦你意识到这一点,错误就很明显了:在某个时候 contador_pratos_consumidos
将恰好比 numero_pratos
少一个。当发生这种情况时,您可以让多个执行线程同时评估 while
条件,并且它们都会很高兴地得出结论,它是真的。
多个执行线程然后进入while
循环。一个人将成功获取互斥锁并消费“产品”,然后完成。剩余的执行线程将永远等待另一个永远不会到达的“产品”。将不再生产更多产品。没有汤给他们。
同样的错误也存在于生产者中,除了错误的影响会相当微妙:最终会生产出比应有的产品更多的产品。
当然,迂腐地讲,所有这些都是未定义的行为,所以任何事情都可能发生,但这些是这种未定义行为的典型、通常的后果。为了使该算法正常工作,必须修复这两个错误。
我需要用 C++ 解决生产者消费者问题,解决 1 个消费者和 6 个生产者,以及 1 个生产者和 6 个消费者,下面是问题的陈述。
问题一: 想象一下,您正在一家非常繁忙的餐厅里等一些朋友,您正在看着等待 table 的工作人员将食物从厨房端到他们的 table 上。这是经典的“生产者-消费者”问题的一个例子。服务器是有限的,厨房不断生产食物。考虑一下服务器(消费者)是有限的,食物是“无限”供应的由厨师(生产者)生产。
一种便于识别并因此减少为“生产者-消费者”问题的方法是限制消费者的数量,从而限制无限次用餐。 在厨房生产。因此,建议设置红绿灯,以控制服务员取餐的生产顺序。
程序类似于:
- 创建信号量;
- 创建服务器和主厨线程;
- 尽可能多地制作食物并记录食物的数量 正在排队;
- 服务器线程将 运行 直到它设法交付在 table秒;和
- 线程必须与主线程“连接”。
还要考虑有6个厨师和1个服务员。如果你愿意,你可以认为厨师做一顿饭需要 50 微秒,而服务员送餐需要 10 微秒。 饭就table。设置要服务的最大客户数。在屏幕上打印,在执行结束时,哪个厨师最闲和最少闲,以及每个厨师做了多少顿饭。
问题二: 考虑到上述餐厅。现在考虑有 1 名厨师和 6 名服务员。假设厨师用 50 微秒做一顿饭,服务员用 15 微秒把饭菜送到 table。设置要服务的最大客户数。 打印哪个服务器最闲和最不闲,每个服务器送了多少餐。
我设法解决了 6 个生产者和 1 个消费者的问题,但是对于 6 个消费者和 1 个生产者来说它不起作用,程序似乎陷入了一些死锁。如果有人知道如何提供帮助,我将不胜感激。
#include <iostream>
#include <random>
#include <chrono>
#include <thread>
#include <mutex>
#include <deque>
//The mutex class is a synchronization primitive that can be used to protect shared data
//from being simultaneously accessed by multiple threads.
std::mutex semaforo; //semafoto para fazer o controle de acesso do buffer
std::condition_variable notifica; //variavel condicional para fazer a notificação de prato consumido/consumido
std::deque<int> buffer; //buffer de inteiros
const unsigned int capacidade_buffer = 10; //tamanho do buffer
const unsigned int numero_pratos = 25; //numeros de pratos a serem produzidos
void produtor()
{
unsigned static int contador_pratos_produzidos = 0;
while (contador_pratos_produzidos < numero_pratos)
{
std::unique_lock<std::mutex> locker(semaforo);
notifica.wait(locker, []
{ return buffer.size() < capacidade_buffer; });
std::this_thread::sleep_for(std::chrono::microseconds(50));
buffer.push_back(contador_pratos_produzidos);
if (contador_pratos_produzidos < numero_pratos)
{
contador_pratos_produzidos++;
}
locker.unlock();
notifica.notify_all();
}
}
void consumidor(int ID, std::vector<int> &consumido)
{
unsigned static int contador_pratos_consumidos = 0;
while (contador_pratos_consumidos < numero_pratos)
{
std::unique_lock<std::mutex> locker(semaforo);
notifica.wait(locker, []
{ return buffer.size() > 0; });
std::this_thread::sleep_for(std::chrono::microseconds(15));
buffer.pop_front();
if (contador_pratos_consumidos < numero_pratos)
{
contador_pratos_consumidos++;
consumido[ID]++;
}
locker.unlock();
notifica.notify_one();
}
}
int main()
{
//vetor para contagem do consumo de cada garcon
std::vector<int> consumido(6, 0);
//vetor de threads garcon(consumidores)
std::vector<std::thread> consumidores;
for (int k = 0; k < 6; k++)
{
consumidores.push_back(std::thread(consumidor, k, std::ref(consumido)));
}
//produtor/chef
std::thread p1(produtor);
for (auto &k : consumidores)
{
k.join();
}
p1.join();
int mais_ocioso = 200, menos_ocioso = 0, mais, menos;
for (int k = 0; k < 6; k++)
{
std::cout << "Garcon " << k + 1 << " entregou " << consumido[k] << " pratos\n";
if (consumido[k] > menos_ocioso)
{
menos = k + 1;
menos_ocioso = consumido[k];
}
if (consumido[k] < mais_ocioso)
{
mais = k + 1;
mais_ocioso = consumido[k];
}
}
std::cout << "\nO mais ocioso foi o garcon " << mais << " e o menos ocioso foi o garcon " << menos << "\n";
}
消费者和生产者函数中都存在完全相同的错误。我将解释其中一个,同样的错误也必须在另一个中修复。
unsigned static int contador_pratos_consumidos = 0;
while (contador_pratos_consumidos < numero_pratos)
{
这个 static
计数器被多个执行线程访问和修改。
任何由多个执行线程使用的非原子对象都必须正确排序(只有在持有适当的互斥体时才能访问)。
如果您将注意力集中在上面的两行代码上,那么很明显可以在没有任何互斥锁保护的情况下访问该计数器。一旦你意识到这一点,错误就很明显了:在某个时候 contador_pratos_consumidos
将恰好比 numero_pratos
少一个。当发生这种情况时,您可以让多个执行线程同时评估 while
条件,并且它们都会很高兴地得出结论,它是真的。
多个执行线程然后进入while
循环。一个人将成功获取互斥锁并消费“产品”,然后完成。剩余的执行线程将永远等待另一个永远不会到达的“产品”。将不再生产更多产品。没有汤给他们。
同样的错误也存在于生产者中,除了错误的影响会相当微妙:最终会生产出比应有的产品更多的产品。
当然,迂腐地讲,所有这些都是未定义的行为,所以任何事情都可能发生,但这些是这种未定义行为的典型、通常的后果。为了使该算法正常工作,必须修复这两个错误。