让我发疯的 Pthread 数据竞赛

Pthread data race that drives me crazy

几周前,我 post 提出了一个与此类似的问题,当时我在使用 C 中的 pthreads 的 N-queens 程序中找不到数据竞争时遇到了问题。Why is my multithreaded C program not working on macOS, but completely fine on Linux?

我在 post 的评论部分得到了一些建议,我真的尽力根据它们进行更正。我听了几天建议,更改了一些部分,但数据竞争仍然存在,我就是不明白为什么。关键部分内有用于生产和消费数量的计数器。在查看代码和分析代码时,我感到完全失明,我知道消耗太多但是围绕该代码片段的同步 应该 以我的知识是正确的,但显然有些东西不正确正确的。非常感谢外部输入。

这是我正在使用的代码,我不确定如何减小其大小以重现问题。我使用 MacBook Pro 2020 x86_64 架构在 macOS Monterey (12.1) 上使用 gcc (clang-1205.0.22.11) 编译它。

编译:gcc -o 8q 8q.c*
运行: ./8q <consumers> <N>, NxN 个棋盘,N 个皇后放置
参数:./8q 2 4 足以突出问题(应该产生 2 个解决方案,但每隔 运行 产生 3+ 个解决方案,即存在重复的解决方案
注意:运行将程序与 ./8q 2 4 结合应该给出 2 个解决方案,1820 个生产和 1820 个消费。


#ifndef _REENTRANT
    #define _REENTRANT
#endif

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>


typedef struct stack_buf {
    int positions[8];
    int top;
} stack_buf;

typedef struct global_buf {
    int positions[8];
    int buf_empty;
    long prod_done;
    int last_done;
} global_buf;

typedef struct print_buf {
    int qpositions[100][8];
    int top;
} print_buf;

stack_buf queen_comb = { {0}, 0 };
print_buf printouts = { {{0}}, 0 };
global_buf global = { {0}, 1, 0, 0 };
int N; //NxN board and N queens to place
long productions = 0;
long consumptions = 0;

pthread_mutex_t buffer_mutex, print_mutex;
pthread_cond_t empty, filled;


/* ##########################################################################################
   ################################## VALIDATION FUNCTIONS ##################################
   ########################################################################################## */

/* Validate that no queens are placed on the same row */
int valid_rows(int qpositions[]) {
    int rows[N];
    memset(rows, 0, N*sizeof(int));
    int row;
    for (int i = 0; i < N; i++) {
        row = qpositions[i] / N;
        if (rows[row] == 0) rows[row] = 1;
        else return 0;
    }
    return 1;
}

/* Validate that no queens are placed in the same column */
int valid_columns(int qpositions[]) {
    int columns[N];
    memset(columns, 0, N*sizeof(int));
    int column;
    for (int i = 0; i < N; i++) {
        column = qpositions[i] % N;
        if (columns[column] == 0) columns[column] = 1;
        else return 0;
    }
    return 1;
}

/* Validate that left and right diagonals aren't used by another queen */
int valid_diagonals(int qpositions[]) {
    int left_bottom_diagonals[N];
    int right_bottom_diagonals[N];
    int row, col, temp_col, temp_row, fill_value, index;

    for (int queen = 0; queen < N; queen++) {
        row = qpositions[queen] / N;
        col = qpositions[queen] % N;
        
        /* position --> left down diagonal endpoint (index) */
        fill_value = col < row ? col : row; // closest to bottom or left wall
        temp_row = row - fill_value;
        temp_col = col - fill_value;
        index = temp_row * N + temp_col; // board position
        for (int i = 0; i < queen; i++) { // check if interference occurs
            if (left_bottom_diagonals[i] == index) return 0;
        }
        left_bottom_diagonals[queen] = index; // no interference

        /* position --> right down diagonal endpoint (index) */
        fill_value = (N-1) - col < row ? N - col - 1 : row; // closest to bottom or right wall
        temp_row = row - fill_value;
        temp_col = col + fill_value;
        index = temp_row * N + temp_col; // board position
        for (int i = 0; i < queen; i++) { // check if interference occurs
            if (right_bottom_diagonals[i] == index) return 0;
        }
        right_bottom_diagonals[queen] = index; // no interference
    }
    return 1;
}

/* ##########################################################################################
   #################################### HELPER FUNCTION(S) ####################################
   ########################################################################################## */

/* print the collected solutions  */
void print(print_buf printouts) {
    static int solution_number = 1;
    int placement;

    pthread_mutex_lock(&print_mutex);
    for (int sol = 0; sol < printouts.top; sol++) { // all solutions
        printf("Solution %d: [ ", solution_number++);
        for (int pos = 0; pos < N; pos++) {
            printf("%d ", printouts.qpositions[sol][pos]+1);
        } 
        printf("]\n");

        printf("Placement:\n");
        for (int i = 1; i <= N; i++) { // rows
            printf("[ ");
            placement = printouts.qpositions[sol][N-i];
            for (int j = (N-i)*N; j < (N-i)*N+N; j++) { // physical position
                if (j == placement) {
                    printf(" Q ");
                } else printf("%2d ", j+1);
            }
            printf("]\n");
        }
        printf("\n");
    }
    pthread_mutex_unlock(&print_mutex);
}


/* ##########################################################################################
   #################################### THREAD FUNCTIONS ####################################
   ########################################################################################## */

/* entry point for each worker (consumer) workers will 
check each queen's row, column and diagonal to evaluate 
satisfactory placements */
void *eval_positioning(void *id) {
    long thr_id = (long)id;
    int qpositions[N]; // on stack (thread-private)
    
    while (1) {
        pthread_mutex_lock(&buffer_mutex);
        while (global.buf_empty == 1) { // no element
            if (global.last_done) { // last done, no combinations left 
                pthread_mutex_unlock(&buffer_mutex);
                pthread_cond_signal(&filled);
                return NULL;
            }
            if (global.prod_done) {
                global.last_done = 1;
                break;
            }
            pthread_cond_wait(&filled, &buffer_mutex);
        }

        memcpy(qpositions, global.positions, N*sizeof(int)); // copy to local scope
        global.buf_empty = 1;
        consumptions++;

        pthread_mutex_unlock(&buffer_mutex);
        pthread_cond_signal(&empty);
        
        if (valid_rows(qpositions) && valid_columns(qpositions) && valid_diagonals(qpositions)) {
            pthread_mutex_lock(&print_mutex);
            memcpy(printouts.qpositions[printouts.top], qpositions, N*sizeof(int));  /* save for printing later */
            printouts.top++;
            pthread_mutex_unlock(&print_mutex);
        }
    }
    return NULL;
}

/* recursively generate all possible queen_combs */
void rec_positions(int pos, int queens) {
    if (queens == 0) { // base case
        pthread_mutex_lock(&buffer_mutex);
        while (global.buf_empty == 0) { // while production hasn't been consumed
            pthread_cond_wait(&empty, &buffer_mutex);
        }
        memcpy(global.positions, queen_comb.positions, N*sizeof(int));
        productions++;
        global.buf_empty = 0;
        pthread_mutex_unlock(&buffer_mutex);
        pthread_cond_signal(&filled);
        return;
    }

    for (int i = pos; i <= N*N - queens; i++) {
        queen_comb.positions[queen_comb.top++] = i;
        rec_positions(i+1, queens-1);
        queen_comb.top--;
    }
}

/* binomial coefficient | without order, without replacement
8 queens on 8x8 board: 4'426'165'368 queen combinations */
void *generate_positions(void *arg) {
    rec_positions(0, N);
    pthread_mutex_lock(&buffer_mutex);
    global.prod_done = 1;
    pthread_mutex_unlock(&buffer_mutex);
    pthread_cond_broadcast(&filled); //wake all to 
    return NULL;
}

/* ##########################################################################################
   ########################################## MAIN ##########################################
   ########################################################################################## */

/* main procedure of the program */
int main(int argc, char *argv[]) {
    if (argc < 3) {
        printf("usage: ./8q <workers> <board width/height>\n");
        exit(1);
    }

    int workers = atoi(argv[1]);
    N = atoi(argv[2]);

    if (N < 2 || N > 8) {
        printf("Wrong input! 2 <= N <= 8\n");
        return 0;
    }

    struct timeval start, stop;
    double elapsed;
    pthread_t consumers[workers];
    pthread_t producer;

    printf("\n");
    
    gettimeofday(&start, NULL);

    pthread_create(&producer, NULL, generate_positions, NULL);
    for (long i = 0; i < workers; i++) {
        pthread_create(&consumers[i], NULL, eval_positioning, (void*)i+1);
    }

    pthread_join(producer, NULL);
    for (int i = 0; i < workers; i++) {
        pthread_join(consumers[i], NULL);
        char id[2];
        sprintf(id, "%d", i+1);
        write(1, id, strlen(id));
        write(1, " done\n\n", 6);
    }

    gettimeofday(&stop, NULL);
    elapsed = stop.tv_sec - start.tv_sec;
    elapsed += (stop.tv_usec - start.tv_usec) / (double)1000000;
    
    /* go through all valid solutions and print */
    print(printouts);
    
    printf("board: %dx%d, workers: %d (+1), exec time: %fs, solutions: %d\n", N, N, workers, elapsed, printouts.top);
    printf("productions:  %ld\nconsumptions: %ld\n", productions, consumptions);
    return 0;
}

您没有初始化互斥量和条件变量。在 pthread API 中使用时结果为 UB。有两种方法,最简单的就是使用正确的初始化器:

pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t print_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t filled = PTHREAD_COND_INITIALIZER;

无关,但值得一提的是,last_done意识形态是没有必要的。这可以通过 buf_empty 和 prod_done 状态来完成。具体来说:

void *eval_positioning(void *tid)
{
    int qpositions[N]; // on stack (thread-private)

    while (1)
    {
        pthread_mutex_lock(&buffer_mutex);

        // while still producing *and* the buffer is empty
        while (!global.prod_done && global.buf_empty)
            pthread_cond_wait(&filled, &buffer_mutex);

        // if both are true, we're done. nothing to process, and
        //  there never will be (e.g. prod_done)
        if (global.prod_done && global.buf_empty)
        {
            // signal anyone else waiting on that mutex+cvar
            pthread_cond_signal(&filled);
            break;
        }

        // if we have a buffer to process (even if prod_done is true)
        else if (!global.buf_empty)
        {
            // make local copy of buffer
            memcpy(qpositions, global.positions, sizeof qpositions);
            ++consumptions;

            // mark global buffer as ready-to-receive
            global.buf_empty = 1;
            pthread_cond_signal(&empty);
            pthread_mutex_unlock(&buffer_mutex);

            // if validated...
            if (valid_rows(qpositions) && valid_columns(qpositions) && valid_diagonals(qpositions))
            {
                // record and bump the printout counter.
                pthread_mutex_lock(&print_mutex);
                int row = printouts.top++;
                pthread_mutex_unlock(&print_mutex);

                // this need not be protected by the mutex. we "own"
                //  this now, and can just blast away.
                memcpy(printouts.qpositions[row], qpositions, sizeof qpositions);
            }
        }
        else
        {
            pthread_mutex_unlock(&buffer_mutex);
        }
    }

    // make sure we unlock this
    pthread_mutex_unlock(&buffer_mutex);

    return tid;
}

正确初始化并发材料和上述 eval 处理器后,这是一致的输出:

输出

1 done
2 done
Solution 1: [ 2 8 9 15 ]
Placement:
[ 13 14  Q 16 ]
[  Q 10 11 12 ]
[  5  6  7  Q ]
[  1  Q  3  4 ]

Solution 2: [ 3 5 12 14 ]
Placement:
[ 13  Q 15 16 ]
[  9 10 11  Q ]
[  Q  6  7  8 ]
[  1  2  Q  4 ]

board: 4x4, workers: 2 (+1), exec time: 0.013001s, solutions: 2
productions:  1820
consumptions: 1820

对微不足道的笔记本电脑性能数字表示歉意