为什么我的消费者线程在我的生产者线程完成之前就停止了?

Why do my consumer threads stop before my producer threads are done?

我最近写了一个无边界锁 queue 并且正在为它做一些测试。在测试中,一些线程产生素数(从某个数字开始,向上计数生产者线程对数的 6 倍,使用确定性 Miller Rabin 测试检查每个数字,并将素数插入 queue ) 和一些线程消耗素数(通过从 queue 中删除元素并检查它们是否为素数)。生产者线程成对出现,每对中有一个产生等于 1 mod 6 的素数,另一个产生等于 5 mod 6 的素数(所有数字都等于 0、2、3 或4 mod 除了 2 和 3) 之外,6 是复合的,主线程生成 2 和 3。有一个全局计数器,用于计算有多少线程未完成生成。每次生产者线程或主线程完成生成素数时,它都会自动递减该计数器。当它不为 0 时,消费者线程循环。

为了确定素数是否真的通过了 queue,我计算了每个线程产生和消耗的素数的第 0 到第 3 个矩,并检查了矩的总和生产者线程等于消费者线程时刻的总和。第 n 个矩就是 n 次方的和,所以这意味着素数的个数、它们的和、它们的平方和以及它们的立方和都匹配。如果序列是彼此的排列,所有时刻都会匹配,所以虽然我需要检查前 n 以确保长度为 n 的序列实际上是排列,但前 4 个匹配意味着序列不匹配的可能性非常小。

我的释放锁 queue 实际上有效,但由于某些原因,当 queue 中仍有元素时,消费者线程全部停止。我不明白为什么,因为生产者线程仅在将其所有质数插入 queue 后才递减生产者计数器,而在所有生产者线程都将其递减后,生产者计数器只能等于 0。因此,每当生产计数器为 0 时,所有元素都已插入到 queue 中。但是,如果消费者尝试删除一个元素,它应该会成功,因为只有在 queue.full(queue 中的元素数)为 0 时,删除才会失败。因此,当生产者计数器为 0 时,消费者应该在 queue.full 为 0 之前能够成功消费,并且在 queue 耗尽之前不应检查生产计数器和 return。他们只在删除失败时检查生产计数器(以防消费者比生产者快并清空 queue)。

但是,当我使 while 循环围绕删除检查 queue.full 除了生产计数器之外,消费者不会 return 早。也就是说,当我改变

__atomic_load_n(&producing, __ATOMIC_SEQ_CST)

__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)

它很管用。请注意,我的代码使用了合理数量的 gcc 扩展,例如属性、__atomic 内置函数、__auto_type、语句表达式、128 位整数、__builtin_ctzll 和 '\e',C99 具有这样的功能作为指定的初始值设定项和复合文字,以及 pthreads。我还在使用顺序一致的内存顺序和强大的比较和交换,即使较弱的版本应该可以工作,因为我不希望出现问题,而我却有问题。这里是 header queue.h:

#ifndef __QUEUE_H__
#define __QUEUE_H__

#include <stddef.h>
#include <inttypes.h>

typedef struct __attribute__((__designated_init__)){//using positional initializers for a struct is terrible
    void *buf;
    uint8_t *flags;//insert started, insert complete, remove started
    size_t cap, full;
    uint64_t a, b;
} queue_t;

typedef struct __attribute__((__designated_init__)){
    size_t size;
} queue_ft;//this struct serves as a class for queue objects: any data specific to the object goes in the queue_t struct and any shared data goes here

int queue_insert(queue_t*, const queue_ft*, void *elem);

int queue_remove(queue_t*, const queue_ft*, void *out);

int queue_init(queue_t*, const queue_ft*, size_t reserve);

void queue_destroy(queue_t*, const queue_ft*);

#endif

这是库源queue.c:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include "queue.h"

int queue_insert(queue_t *self, const queue_ft *ft, void *elem){
    uint64_t i;
    while(1){
        uint8_t flag = 0;
        if(__atomic_load_n(&self->full, __ATOMIC_SEQ_CST) == self->cap){
            return 0;
        }
        i = __atomic_load_n(&self->b, __ATOMIC_SEQ_CST);
        if(__atomic_compare_exchange_n(self->flags + i, &flag, 0x80, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the insert started flag if all flags are clear
            break;
        }
    }
    __atomic_fetch_add(&self->full, 1, __ATOMIC_SEQ_CST);
    uint64_t b = i;
    while(!__atomic_compare_exchange_n(&self->b, &b, (b + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the b endpoint of the queue with wraparaound
    memcpy(self->buf + i*ft->size, elem, ft->size);//actually insert the item.  accesses to the buffer mirror accesses to the flags so this is safe
    __atomic_thread_fence(memory_order_seq_cst);
    __atomic_store_n(self->flags + i, 0xc0, __ATOMIC_SEQ_CST);//set the insert completed flag
    return 1;
}

int queue_remove(queue_t *self, const queue_ft *ft, void *out){
    uint64_t i;
    while(1){
        uint8_t flag = 0xc0;
        if(!__atomic_load_n(&self->full, __ATOMIC_SEQ_CST)){
            return 0;
        }
        i = __atomic_load_n(&self->a, __ATOMIC_SEQ_CST);
        if(__atomic_compare_exchange_n(self->flags + i, &flag, 0xe0, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)){//set the remove started flag if insert started and insert completed are set but the other flags are clear
            break;
        }
    }
    __atomic_fetch_sub(&self->full, 1, __ATOMIC_SEQ_CST);
    uint64_t a = i;
    while(!__atomic_compare_exchange_n(&self->a, &a, (a + 1)%self->cap, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));//increase the a endpoint of the queue with wraparound
    memcpy(out, self->buf + i*ft->size, ft->size);//actually remove the item.
    __atomic_thread_fence(__ATOMIC_SEQ_CST);
    __atomic_store_n(self->flags + i, 0x00, __ATOMIC_SEQ_CST);//clear all the flags to mark the remove as completed
    return 1;
}

int queue_init(queue_t *self, const queue_ft *ft, size_t reserve){
    void *buf = malloc(reserve*ft->size);
    if(!buf){
        return 0;
    }
    uint8_t *flags = calloc(reserve, sizeof(uint8_t));
    if(!flags){
        free(buf);
        return 0;
    }
    *self = (queue_t){
        .buf=buf,
        .flags=flags,
        .cap=reserve,
        .full=0,
        .a=0,.b=0
    };
    return 1;
}

void queue_destroy(queue_t *self, const queue_ft *ft){
    free(self->buf);
    free(self->flags);
}

这里是测试程序源码test_queue_pc.c:

#define _POSIX_C_SOURCE 201612UL

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <pthread.h>
#include <math.h>
#include <time.h>
#include "queue.h"

//Generate primes up to this number.  Note 78498 is the number of primes below 1000000; this is hard coded because the queue does not support growing yet.
#define MAX 1000000
#define QUEUE_SIZE 78498
#define NUM_PRODUCER_PAIRS 3
#define NUM_CONSUMERS 2
//Every producer and consumer thread calculates the 0th through 3rd moments of the sequence of primes it sees, as well as testing them for primality.
//The nth moment is the sum of the nth powers, thus, the order does not matter and if the primes are the same in both the producers and the consumers
//then the sums of the moments will also be the same.  I check that the 0th through 3rd moments match which means it is nearly certain the primes go through
//the queue.
#define NUM_MOMENTS 4

//Deterministic Miller Rabin witnesses (see https://en.wikipedia.org/wiki/Miller–Rabin_primality_test)
#define DMR_PRIMES (uint64_t[]){2, 13, 23, 1662803}
#define DMR_PRIMES_C 4

//Macro to split an integer into three parts.  The first part has the 2**0, 2**3, 2**6, ..., 2**60 bits of the original and 0 elsewhere.
//The second part has the 2**1, 2**4, 2**7, ..., 2**61 bits of the original and 0 elsewhere.  The last part has the 2**2, ..., 2**62 bits.
//The 2**63 bit is lost.  The masks represent the sums of geometric sequences.  The original number can be obtained by bitwise or or xor on the parts.
//I spread the uint64_t's (which are unsigned long longs) over 3 uint64_t's so that they take up 24 bytes and memcpy'ing them happens in multiple steps.
//This spreading is only done on primes that have been produced before they are put into the queue.  The consumers then recombine and verify them.
#define SPREAD_EMPLACE(n) ({__auto_type _n = (n); &(spread_integer){(_n)&(((1ULL<<60)-1)/7), (_n)&(((1ULL<<61)-2)/7), (_n)&(((1ULL<<62)-4)/7)};})

typedef struct{
    uint64_t x, y, z;
} spread_integer;

queue_ft spread_integer_ft = {.size= sizeof(spread_integer)};

queue_t Q;
//Start producing count at 1 + (NUM_PRODUCING_THREADS << 1) because main generates 2 and 3 and reduce it by 1 every time a producer thread finishes
int producing = 1 + (NUM_PRODUCER_PAIRS << 1);

//Uses the binary algorithm for modular exponentiation (https://en.wikipedia.org/wiki/Exponentiation_by_squaring)
//It is a helper function for isPrime
uint64_t powmod(unsigned __int128 b, uint64_t e, uint64_t n){
    unsigned __int128 r = 1;
    b %= n;
    while(e){
        if(e&1){
            r = r*b%n;
        }
        e >>= 1;
        b = b*b%n;
    }
    return (uint64_t)r;
}

//uses deterministic Miller Rabin primality test
int isPrime(uint64_t n){
    uint64_t s, d;//s, d | 2^s*d = n - 1
    if(n%2 == 0){
        return n == 2;
    }
    --n;
    s = __builtin_ctzll(n);
    d = n>>s;
    ++n;
    for(uint64_t i = 0, a, x; i < DMR_PRIMES_C; ++i){
        a = DMR_PRIMES[i];
        if(a >= n){
            break;
        }
        x = powmod(a, d, n);
        if(x == 1 || x == n - 1){
            goto CONTINUE_WITNESSLOOP;
        }
        for(a = 0; a < s - 1; ++a){
            x = powmod(x, 2, n);
            if(x == 1){
                return 0;
            }
            if(x == n - 1){
                goto CONTINUE_WITNESSLOOP;
            }
        }
        return 0;
        CONTINUE_WITNESSLOOP:;
    }
    return 1;
}

void *produce(void *_moments){
    uint64_t *moments = _moments, n = *moments;//the output argument for the 0th moment serves as the input argument for the number to start checking for primes at
    *moments = 0;
    for(; n < MAX; n += 6*NUM_PRODUCER_PAIRS){//the producers are paired so one of every pair generates primes equal to 1 mod 6 and the other equal to 5 mod 6.  main generates 2 and 3 the only exceptions
        if(isPrime(n)){
            for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
                moments[i] += m;
            }
            if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){
                fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n");
                exit(EXIT_FAILURE);
            }
        }
    }
    __atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);//this thread is done generating primes; reduce producing counter by 1
    return moments;
}

void *consume(void *_moments){
    uint64_t *moments = _moments;
    while(__atomic_load_n(&producing, __ATOMIC_SEQ_CST) || __atomic_load_n(&Q.full, __ATOMIC_SEQ_CST)){//busy loop while some threads are producing
        spread_integer xyz;
        if(queue_remove(&Q, &spread_integer_ft, &xyz)){
            uint64_t n = xyz.x | xyz.y | xyz.z;
            if(isPrime(n)){
                for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
                    moments[i] += m;
                }
            }else{
                fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n");
                exit(EXIT_FAILURE);
            }
        }
    }
    return moments;
}

int main(void){
    if(!queue_init(&Q, &spread_integer_ft, QUEUE_SIZE)){
        fprintf(stderr, "\e[1;31mERROR: Could not initialize queue.\e[0m\n");
        exit(EXIT_FAILURE);
    }
    pthread_t producers[NUM_PRODUCER_PAIRS << 1], consumers[NUM_CONSUMERS];
    uint64_t moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1][NUM_MOMENTS] = {};//the 2 extras are because main produces the primes 2 and 3 and consumes primes the consumers leave behind
    for(size_t i = 0; i < NUM_CONSUMERS; ++i){//create consumers first to increase likelihood of causing bugs
        if(pthread_create(consumers + i, NULL, consume, moments[(NUM_PRODUCER_PAIRS << 1) + 1 + i])){
            fprintf(stderr, "\e[1;31mERROR: Could not create consumer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    for(size_t i = 0; i < NUM_PRODUCER_PAIRS; ++i){
        moments[i << 1][0] = 5 + 6*i;
        if(pthread_create(producers + (i << 1), NULL, produce, moments[i << 1])){
            fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
        moments[(i << 1) + 1][0] = 7 + 6*i;
        if(pthread_create(producers + (i << 1) + 1, NULL, produce, moments[(i << 1) + 1])){
            fprintf(stderr, "\e[1;31mERROR: Could not create producer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    for(uint64_t n = 2; n < 4; ++n){
        for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
            moments[NUM_PRODUCER_PAIRS << 1][i] += m;
        }
        if(!queue_insert(&Q, &spread_integer_ft, SPREAD_EMPLACE(n))){
            fprintf(stderr, "\e[1;31mERROR: Could not insert into queue.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    __atomic_fetch_sub(&producing, 1, __ATOMIC_SEQ_CST);
    uint64_t c = 0;
    for(size_t i = 0; i < NUM_CONSUMERS; ++i){//join consumers first to bait bugs.  Note consumers should not finish until the producing counter reaches 0
        void *_c;
        if(pthread_join(consumers[i], &_c)){
            fprintf(stderr, "\e[1;31mERROR: Could not join consumer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
        c += (uintptr_t)_c;
    }
    for(size_t i = 0; i < NUM_PRODUCER_PAIRS << 1; ++i){
        if(pthread_join(producers[i], NULL)){
            fprintf(stderr, "\e[1;31mERROR: Could not join producer thread.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    //this really should not be happening because the consumer threads only return after the producing counter reaches 0,
    //which only happens after all of the producer threads are done inserting items into the queue.
    if(Q.full){
        fprintf(stdout, "\e[1;31mWTF: Q.full != 0\nproducing == %d\e[0m\n", producing);
    }
    while(Q.full){
        spread_integer xyz;
        if(!queue_remove(&Q, &spread_integer_ft, &xyz)){
            fprintf(stderr, "\e[1;31mERROR: Could not remove from non empty queue.\e[0m\n");
            exit(EXIT_FAILURE);
        }
        uint64_t n = xyz.x | xyz.y | xyz.z;
        if(isPrime(n)){
            for(uint64_t m = 1, i = 0; i < NUM_MOMENTS; m *= n, ++i){
                moments[(NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS][i] += m;
            }
        }else{
            fprintf(stderr, "\e[1;31mERROR: Generated a prime that fails deterministic Miller Rabin.\e[0m\n");
            exit(EXIT_FAILURE);
        }
    }
    queue_destroy(&Q, &spread_integer_ft);
    for(uint64_t i = 0, p, c, j; i < NUM_MOMENTS; ++i){
        for(j = p = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1; ++j){
            p += moments[j][i];
        }
        for(c = 0; j < (NUM_PRODUCER_PAIRS << 1) + 1 + NUM_CONSUMERS + 1; ++j){
            c += moments[j][i];
        }
        printf("Moment %"PRIu64" %"PRIu64" -> %"PRIu64"\n", i, p, c);
    }
}

然后我用

编译
gcc -o test_queue_pc queue.c test_queue_pc.c -Wall -std=c99 -g -O0 -pthread -fuse-ld=gold -flto -lm

为什么消费者线程 return 在 queue 之前是空的,即使他们等待生产者完成,当他们在 producing 上循环时,但是做正确的当他们循环 producing || Q.full?

时的事情

Why do the consumer threads return before the queue is empty even though they wait for the producers to be done, when they loop just on producing, but do the correct thing when they loop on producing || Q.full?

因为没有更多的生产者意味着不会有新的条目添加到队列中;它不是意味着队列已经是空的。

考虑生产者比消费者更快的情况。他们将他们的东西添加到队列中,然后退出。此时,队列中有项目,但活动生产者计数为零。如果消费者只检查是否有活跃的生产者,他们会错过队列中已经存在的项目。


需要注意的是检查

if ((active producers) || (items in queue))

这里是C99中正确的。 (|| 运算符在左侧计算之后有一个序列点。也就是说,右侧永远不会在左侧之前计算。)

如果你只检查活跃的生产者,你会错过生产者比消费者快的情况,并在队列中还有项目时退出。

如果您只检查队列中的项目,您会错过生产者仍在向队列中添加内容的情况。

如果你先检查队列是否为空,你就开始了一场比赛window。在消费者检查队列是否为空之后,但在消费者检查是否有活动的生产者之前,生产者可以将一个或多个项目添加到队列并退出。

您需要先查看是否有活跃的生产者。如果有活跃的生产者,并且队列现在是空的,那么消费者必须等待新的项目到达队列(直到活跃的生产者计数下降到零,或者新的项目到达队列。)如果没有活跃的生产者,消费者必须检查队列中是否有项目。没有活动的生产者意味着队列中不会出现新项目,但并不意味着队列已经为空。