多个生产者-消费者问题坚持最后一次消费

Multiple producer-consumer problem sticks on last consume

我正在尝试使用 pthreads 和信号量解决一个多生产者-消费者问题,但它总是停留在最后一次消费和停止时。 它将有 NO_ITEMS 个项目,假设缓冲区的大小为 BUFFER_SIZE

下面是我当前的代码。

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100

using namespace std;

void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();

sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;

stack<int> items;
static int count = 0;



int main()
{
    sem_init(&fillCount, 0, 0);
    sem_init(&emptyCount, 0, BUFFER_SIZE);
    pthread_mutex_init(&mutex, nullptr);
    pthread_t p1, c1, c2, c3;

    pthread_create(&p1, nullptr, thread_producer, nullptr);
    pthread_create(&c1, nullptr, thread_consumer, nullptr);
    pthread_create(&c2, nullptr, thread_consumer, nullptr);
    pthread_create(&c3, nullptr, thread_consumer, nullptr);

    pthread_join(p1, nullptr);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);

    sem_destroy(&fillCount);
    sem_destroy(&emptyCount);
    pthread_mutex_destroy(&mutex);

    return 0;
}

void* thread_consumer(void* args) {

    while (count < NO_ITEMS) {
        sem_wait(&fillCount);
        pthread_mutex_lock(&mutex);

        if (!items.empty() && count < NO_ITEMS - 1) {
            removeItem();
        }

        count++;
        pthread_mutex_unlock(&mutex);
        sem_post(&emptyCount);
    }

    return nullptr;
}

void* thread_producer(void* args) {
    for (int i = 0; i < NO_ITEMS; i++) {
        sem_wait(&emptyCount);
        pthread_mutex_lock(&mutex);

        addItem(i);
        // sleep(1);

        pthread_mutex_unlock(&mutex);
        sem_post(&fillCount);
    }

    return nullptr;

}

void addItem(int i) {
    cout << "Produced: " << i << endl;
    items.push(i);
}

void removeItem() {
    cout << "Consumed: " << items.top() << endl;
    items.pop();

}

这是输出的部分:

Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt

Flawed logic

Your code has a logic problem. Suppose NO_ITEMS is 100, and 99 have so far been consumed. Let two consumer threads arrive at the top of the while loop at that point, and suppose that both read count as 99 (but see below), and therefore enter the body of the loop. Both consumers will block on sem_wait(), but there is at most one more item to be produced, so the producer will increment the semaphore at most once more, leaving at least one of the consumers blocked indefinitely.

Undefined behavior

Moreover, your thread_consumer() function contains a data race, leaving your program's behavior undefined. Specifically, the read of shared variable count in the while condition is not properly synchronized. Although one cannot reliably predict how UB will manifest (else it would not be "undefined"), it is fairly common for unsynchronized accesses to manifest apparent failures of one thread to see shared-variable updates of other threads. Such a failure mode would explain your particular observed behavior all by itself.

Very likely, a correct fix to this synchronization problem would also fix the logic problem.

Solutions

There are multiple possible solutions. Here are some promising ones:

  1. Semaphores are not a particularly comfortable fit for the problem. You need a mutex anyway, and its usual counterpart for signaling is a condition variable. I would convert the two semaphores to two (or maybe just one) ordinary integer variable, and use a standard mutex + CV pattern in both producer and consumer. That would include adding mutex protection for the read of count in the consumer.

  2. On the other hand, if you are obligated to use semaphores, then you could

    • add appropriate mutex protection for the consumers' read of count
    • be sure to retain the consumers' test for whether they can actually consume an item after successfully decrementing the semaphore
    • have the main program post twice (number of consumers - 1 times) to fillCount after joining the producer thread but before attempting to join the consumers. This will unblock any consumers that thought they would be able to consume an item but end up still waiting after the last item is consumed by another consumer.
  3. Or you could employ a hybrid: retain the emptyCount semaphore to limit the number of items waiting at any given time (instead of switching to a CV for that purpose), but switch to a mutex + CV pattern for managing the consumers.