每次都在 C 中工作最后一个消费者(生产者消费者问题)
Everytime works last consumer (producer consumer problem) in C
我有 10 个消费者线程和 1 个生产者线程。生产者线程产生一个随机整数并将其插入缓冲区。消费者线程从这个缓冲区中获取并删除一个项目并打印它。
一切都很好(我在,没有无限循环或阻塞的东西)。但我认为,只有一个工作的消费者线程是第 10 个(最后一个)消费者线程。其他 9 个消费者线程不起作用。我意识到当我在消费者线程方法中打印消费者 ID 时。为什么其他 9 个消费者线程不起作用,这种问题可以做什么?
下面是我的代码:
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
typedef int buffer_item;
#define BUFFER_SIZE 5
#define RAND_DIVISOR 100000000
#define TRUE 1
pthread_mutex_t mutex;
sem_t full, empty;
buffer_item buffer[BUFFER_SIZE];
int counter;
pthread_t tid;
pthread_attr_t attr;
void *producer(void *param);
void *consumer(void *param);
int insert_item(buffer_item item) {
if(counter < BUFFER_SIZE) {
buffer[counter] = item;
counter++;
return 0;
}
else {
return -1;
}
}
int remove_item(buffer_item *item) {
if(counter > 0) {
*item = buffer[(counter-1)];
counter--;
return 0;
}
else {
return -1;
}
}
void initializeData() {
pthread_mutex_init(&mutex, NULL);
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
pthread_attr_init(&attr);
counter = 0;
}
void *producer(void *param) {
buffer_item item;
while(TRUE) {
int rNum = rand() / RAND_DIVISOR;
sleep(1);
item = rand()%100;
sem_wait(&empty);
pthread_mutex_lock(&mutex);
if(insert_item(item)) {
fprintf(stderr, " Producer report error condition\n");
}
else {
printf("producer produced %d\n", item);
}
pthread_mutex_unlock(&mutex);
sem_post(&full);
}
}
void *consumer(void *param) {
buffer_item item;
int* consumerID=(int*)param;
while(TRUE) {
int rNum = rand() / RAND_DIVISOR;
sleep(1);
sem_wait(&full);
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
fprintf(stderr, "Consumer report error condition\n");
}
else {
printf("consumer %d consumed %d\n" ,*consumerID, item);
}
pthread_mutex_unlock(&mutex);
sem_post(&empty);
}
}
int main(int argc, char *argv[]) {
/* Loop counter */
int i;
int numProd = 1; /* Number of producer threads */
int numCons = 10; /* Number of consumer threads */
/* Initialize the app */
initializeData();
/* Create the producer threads */
for(i = 0; i < numProd; i++) {
/* Create the thread */
pthread_create(&tid,&attr,producer,NULL);
}
/* Create the consumer threads */
for(i = 0; i < numCons; i++) {
/* Create the thread */
pthread_create(&tid,&attr,consumer,(void*)&i);
}
/* Sleep for the specified amount of time in milliseconds */
sleep(10);
/* Exit the program */
printf("Exit the program\n");
exit(0);
}
我的输出是:
producer produced 27
consumer 10 consumed 27
producer produced 63
consumer 10 consumed 63
producer produced 26
consumer 10 consumed 26
producer produced 11
consumer 10 consumed 11
producer produced 29
consumer 10 consumed 29
producer produced 62
consumer 10 consumed 62
producer produced 35
consumer 10 consumed 35
producer produced 22
consumer 10 consumed 22
producer produced 67
consumer 10 consumed 67
Exit the program
在:
for(i = 0; i < numCons; i++) {
/* Create the thread */
pthread_create(&tid,&attr,consumer,(void*)&i);
您正在向每个线程传递一个指向 i
的指针并保留一个指向它的指针,该指针将遵循 i
的值
您需要在线程函数的开头复制到一个新变量,但重要的是要知道线程函数不会立即启动当pthread_create叫做。很可能在循环结束并且 i = 10 之前它们不会开始。所以可能发生的情况是您实际上有 10 个消费者,但他们都有相同的数量。
如果您想使用 i
作为 ID,您应该在创建新线程之前等待来自目标线程的信号量(这将在从 *param 分配后发布)。
如果你想要一个非常简单的测试,你可以在调用每个 pthread_create 之后添加一个 sleep(1) 调用。这应该给每个线程时间来启动并正确分配一个 ID。
Shawn 先于我,但他是对的。请参阅以下实现:
/* Consumer Thread */
void *consumer(void *param) {
buffer_item item;
int* consumerID=(int*)param;
printf("consumer %d created\n" ,*consumerID);
while(TRUE) {
/* sleep for a random period of time */
int rNum = rand() / RAND_DIVISOR;
sleep(1);
/* aquire the full lock */
sem_wait(&full)%100;
/* aquire the mutex lock */
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
//fprintf(stderr, "Consumer report error condition: consumer %d item: %d\n", *consumerID, item);
}
else {
printf("consumer %d consumed %d\n" ,*consumerID, item);
}
/* release the mutex lock */
pthread_mutex_unlock(&mutex);
/* signal empty */
sem_post(&empty);
}
}
long taskids[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
/* Create the consumer threads */
for(i = 0; i < numCons; i++) {
taskids[i] = i;
/* Create the thread */
pthread_create(&tid,&attr,consumer, taskids + i);
}
这导致:
consumer 0 created
consumer 1 created
consumer 2 created
consumer 5 created
consumer 3 created
consumer 6 created
consumer 9 created
consumer 7 created
consumer 8 created
consumer 4 created
producer produced 65
consumer 9 consumed 65
producer produced 57
consumer 6 consumed 57
producer produced 33
consumer 5 consumed 33
producer produced 57
consumer 1 consumed 57
producer produced 3
consumer 3 consumed 3
producer produced 81
consumer 9 consumed 81
producer produced 1
consumer 5 consumed 1
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class consumeprodue<BlockingQue> {
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
public static void main(String[] args) {
Thread p = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println(getName() + " produced :" + i);
queue.put(i);
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread c = new Thread() {
public void run() {
try {
while (true) {
System.out.println(getName() + "consume :" + queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
p.start();
c.start();
}
}
我有 10 个消费者线程和 1 个生产者线程。生产者线程产生一个随机整数并将其插入缓冲区。消费者线程从这个缓冲区中获取并删除一个项目并打印它。
一切都很好(我在,没有无限循环或阻塞的东西)。但我认为,只有一个工作的消费者线程是第 10 个(最后一个)消费者线程。其他 9 个消费者线程不起作用。我意识到当我在消费者线程方法中打印消费者 ID 时。为什么其他 9 个消费者线程不起作用,这种问题可以做什么?
下面是我的代码:
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
typedef int buffer_item;
#define BUFFER_SIZE 5
#define RAND_DIVISOR 100000000
#define TRUE 1
pthread_mutex_t mutex;
sem_t full, empty;
buffer_item buffer[BUFFER_SIZE];
int counter;
pthread_t tid;
pthread_attr_t attr;
void *producer(void *param);
void *consumer(void *param);
int insert_item(buffer_item item) {
if(counter < BUFFER_SIZE) {
buffer[counter] = item;
counter++;
return 0;
}
else {
return -1;
}
}
int remove_item(buffer_item *item) {
if(counter > 0) {
*item = buffer[(counter-1)];
counter--;
return 0;
}
else {
return -1;
}
}
void initializeData() {
pthread_mutex_init(&mutex, NULL);
sem_init(&full, 0, 0);
sem_init(&empty, 0, BUFFER_SIZE);
pthread_attr_init(&attr);
counter = 0;
}
void *producer(void *param) {
buffer_item item;
while(TRUE) {
int rNum = rand() / RAND_DIVISOR;
sleep(1);
item = rand()%100;
sem_wait(&empty);
pthread_mutex_lock(&mutex);
if(insert_item(item)) {
fprintf(stderr, " Producer report error condition\n");
}
else {
printf("producer produced %d\n", item);
}
pthread_mutex_unlock(&mutex);
sem_post(&full);
}
}
void *consumer(void *param) {
buffer_item item;
int* consumerID=(int*)param;
while(TRUE) {
int rNum = rand() / RAND_DIVISOR;
sleep(1);
sem_wait(&full);
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
fprintf(stderr, "Consumer report error condition\n");
}
else {
printf("consumer %d consumed %d\n" ,*consumerID, item);
}
pthread_mutex_unlock(&mutex);
sem_post(&empty);
}
}
int main(int argc, char *argv[]) {
/* Loop counter */
int i;
int numProd = 1; /* Number of producer threads */
int numCons = 10; /* Number of consumer threads */
/* Initialize the app */
initializeData();
/* Create the producer threads */
for(i = 0; i < numProd; i++) {
/* Create the thread */
pthread_create(&tid,&attr,producer,NULL);
}
/* Create the consumer threads */
for(i = 0; i < numCons; i++) {
/* Create the thread */
pthread_create(&tid,&attr,consumer,(void*)&i);
}
/* Sleep for the specified amount of time in milliseconds */
sleep(10);
/* Exit the program */
printf("Exit the program\n");
exit(0);
}
我的输出是:
producer produced 27
consumer 10 consumed 27
producer produced 63
consumer 10 consumed 63
producer produced 26
consumer 10 consumed 26
producer produced 11
consumer 10 consumed 11
producer produced 29
consumer 10 consumed 29
producer produced 62
consumer 10 consumed 62
producer produced 35
consumer 10 consumed 35
producer produced 22
consumer 10 consumed 22
producer produced 67
consumer 10 consumed 67
Exit the program
在:
for(i = 0; i < numCons; i++) {
/* Create the thread */
pthread_create(&tid,&attr,consumer,(void*)&i);
您正在向每个线程传递一个指向 i
的指针并保留一个指向它的指针,该指针将遵循 i
您需要在线程函数的开头复制到一个新变量,但重要的是要知道线程函数不会立即启动当pthread_create叫做。很可能在循环结束并且 i = 10 之前它们不会开始。所以可能发生的情况是您实际上有 10 个消费者,但他们都有相同的数量。
如果您想使用 i
作为 ID,您应该在创建新线程之前等待来自目标线程的信号量(这将在从 *param 分配后发布)。
如果你想要一个非常简单的测试,你可以在调用每个 pthread_create 之后添加一个 sleep(1) 调用。这应该给每个线程时间来启动并正确分配一个 ID。
Shawn 先于我,但他是对的。请参阅以下实现:
/* Consumer Thread */
void *consumer(void *param) {
buffer_item item;
int* consumerID=(int*)param;
printf("consumer %d created\n" ,*consumerID);
while(TRUE) {
/* sleep for a random period of time */
int rNum = rand() / RAND_DIVISOR;
sleep(1);
/* aquire the full lock */
sem_wait(&full)%100;
/* aquire the mutex lock */
pthread_mutex_lock(&mutex);
if(remove_item(&item)) {
//fprintf(stderr, "Consumer report error condition: consumer %d item: %d\n", *consumerID, item);
}
else {
printf("consumer %d consumed %d\n" ,*consumerID, item);
}
/* release the mutex lock */
pthread_mutex_unlock(&mutex);
/* signal empty */
sem_post(&empty);
}
}
long taskids[] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
/* Create the consumer threads */
for(i = 0; i < numCons; i++) {
taskids[i] = i;
/* Create the thread */
pthread_create(&tid,&attr,consumer, taskids + i);
}
这导致:
consumer 0 created
consumer 1 created
consumer 2 created
consumer 5 created
consumer 3 created
consumer 6 created
consumer 9 created
consumer 7 created
consumer 8 created
consumer 4 created
producer produced 65
consumer 9 consumed 65
producer produced 57
consumer 6 consumed 57
producer produced 33
consumer 5 consumed 33
producer produced 57
consumer 1 consumed 57
producer produced 3
consumer 3 consumed 3
producer produced 81
consumer 9 consumed 81
producer produced 1
consumer 5 consumed 1
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class consumeprodue<BlockingQue> {
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
public static void main(String[] args) {
Thread p = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println(getName() + " produced :" + i);
queue.put(i);
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread c = new Thread() {
public void run() {
try {
while (true) {
System.out.println(getName() + "consume :" + queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
p.start();
c.start();
}
}