使用互斥锁在 C 中实现的第一个读写器解决方案有什么问题?

What's wrong with this first readers-writers solution implementation in C using mutex?

我正在尝试在 C 中实现第一个 readers 编写器问题(reader 的偏好)。我正在使用互斥锁和解锁来确保没有编写器可以访问该线程如果 reader 有锁,并且任何 reader 都可以访问线程,前提是第一个 reader 有锁。这是我的代码。直到最后我都无法获取我的代码,即它没有到达线程连接部分。我想我在某个地方遇到了死锁,或者我可能将互斥锁和解锁放在了错误的位置。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <fcntl.h>
#include <sys/types.h>

#define FALSE 0
#define TRUE  1
#define SLOWNESS 30000
#define INVALID_ACCNO -99999
#define SIZE  100
#define WRITE_ITR 100000
#define READ_ITR  100000
#define MAX_BALANCE 1000000

typedef struct {
        int accno;
        float balance;
} account;

// sleep function
void rest() 
{
    usleep(100);
}

//Global shared data structure
account account_list[SIZE];             /* this is the data structure that the readers and writers will be accessing concurrently.*/

pthread_mutex_t rw_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t r_lock = PTHREAD_MUTEX_INITIALIZER;


/* Writer thread - will update the account_list data structure. 
   Takes as argument the seed for the srand() function. 
*/
void * writer_thr(void * arg) 
{
    printf("Writer thread ID %ld\n", pthread_self());   
    srand(*((unsigned int *) arg));     /* set random number seed for this writer */

    int i, j;
    int r_idx;
    unsigned char found;            /* For every update_acc[j], set to TRUE if found in account_list, else set to FALSE */
    account update_acc[WRITE_ITR];

    /* first create a random data set of account updates */
    for (i = 0; i < WRITE_ITR;i++) 
    {
        r_idx = rand() % SIZE;      /* a random number in the range [0, SIZE) */ 
        update_acc[i].accno = account_list[r_idx].accno;
        update_acc[i].balance = 1000.0 + (float) (rand() % MAX_BALANCE);
    }//end for

    /* open a writer thread log file */
    char thr_fname[64];
    snprintf(thr_fname, 64, "writer_%ld_thr.log", pthread_self());
    FILE* fd = fopen(thr_fname, "w");
    if (!fd) 
    {
        fprintf(stderr,"Failed to open writer log file %s\n", thr_fname);
        pthread_exit(&errno);
    }//end if

    /* The writer thread will now try to update the shared account_list data structure. 
       For each entry 'j' in the update_acc[] array, it will find the corresponding 
       account number in the account_list array and update the balance of that account
       number with the value stored in update_acc[j].   
    */

    int temp_accno;
    for (j = 0; j < WRITE_ITR;j++) {
        found = FALSE;
        for (i = 0; i < SIZE;i++) {
            if (account_list[i].accno == update_acc[j].accno) {
                found = 1;
                temp_accno = account_list[i].accno;
                pthread_mutex_lock(&rw_lock);
                account_list[i].accno = INVALID_ACCNO;
                account_list[i].balance = update_acc[j].balance;
                account_list[i].accno = temp_accno;
                rest();                 /* makes the write long duration - SO AS TO INTRODUCE LATENCY IN WRITE before going for next 'j' */
                pthread_mutex_unlock(&rw_lock);
                fprintf(fd, "Account number = %d [%d]: old balance = %6.2f, new balance = %6.2f\n",
                        account_list[i].accno, update_acc[j].accno, account_list[i].balance, update_acc[j].balance);

        }//end if
        if (!found)
            fprintf(fd, "Failed to find account number %d!\n", update_acc[j].accno);

    }   // end test-set for-loop
}
    fclose(fd);
    return NULL;
}

/* Reader thread - will read the account_list data structure. 
   Takes as argument the seed for the srand() function. 
*/
void * reader_thr(void *arg) 
{
    printf("Reader thread ID %ld\n", pthread_self());
    srand(*((unsigned int *) arg));   /* set random number seed for this reader */

    int i, j;
    int r_idx;
    unsigned char found;            /* For every read_acc[j], set to TRUE if found in account_list, else set to FALSE */
    account read_acc[READ_ITR];

    /* first create a random data set of account updates */
    for (i = 0; i < READ_ITR;i++) 
    {
        r_idx = rand() % SIZE;      /* a random number in the range [0, SIZE) */
        read_acc[i].accno = account_list[r_idx].accno;
        read_acc[i].balance = 0.0;      /* we are going to read in the value */
    }//end for

    /* open a reader thread log file */
    char thr_fname[64];
    snprintf(thr_fname, 64, "reader_%ld_thr.log", pthread_self());
    FILE *fd = fopen(thr_fname, "w");
    if (!fd) 
    {
        fprintf(stderr,"Failed to reader log file %s\n", thr_fname);
        pthread_exit(&errno);
    }//end if

    /* The reader thread will now try to read the shared account_list data structure.
       For each entry 'j' in the read_acc[] array, the reader will fetch the 
       corresponding balance from the account_list[] array and store in
       read_acc[j].balance. */
    for (j = 0; j < READ_ITR;j++) {
        /* Now read the shared data structure */
        found = FALSE;
        for (i = 0; i < SIZE;i++) {
            rest();
            if (account_list[i].accno == read_acc[j].accno) {
                found = TRUE;
                fprintf(fd, "Account number = %d [%d], balance read = %6.2f\n",
                            account_list[i].accno, read_acc[j].accno, read_acc[j].balance); 
                pthread_mutex_lock(&r_lock);
                if(j == 1)
                {
                    pthread_mutex_lock(&rw_lock);
                }
                pthread_mutex_unlock(&r_lock);
                read_acc[j].balance = account_list[i].balance;
                pthread_mutex_lock(&r_lock);
                if(j == READ_ITR -  1)
                {
                    pthread_mutex_unlock(&rw_lock);
                }
                pthread_mutex_unlock(&r_lock);

        }

        if (!found)
            fprintf(fd, "Failed to find account number %d!\n", read_acc[j].accno);
    }   // end test-set for-loop
}

    fclose(fd);
    return NULL;
}

/* populate the shared account_list data structure */
void create_testset() {
    time_t t;
    srand(time(&t));
    int i;  
    for (i = 0;i < SIZE;i++) {
        account_list[i].accno = 1000 + rand() % RAND_MAX;
        account_list[i].balance = 100 + rand() % MAX_BALANCE;
    }   
    return;
}


void usage(char *str) {
    printf("Usage: %s -r <NUM_READERS> -w <NUM_WRITERS>\n", str);
    return;
}


int main(int argc, char *argv[]) 
{
    time_t t;
    unsigned int seed;
    int i;

    int READ_THREADS;           /* number of readers to create */
    int WRITE_THREADS;          /* number of writers to create */

    if(argc <= 3)
    {
        usage("./rw");
        exit(EXIT_FAILURE);
    }   
    int opt;
    while((opt = getopt(argc, argv, "r:w:")) != -1)
    {
        switch(opt)     
        {
            case 'r':
                READ_THREADS = atoi(optarg);
                break;
            case 'w':
                WRITE_THREADS = atoi(optarg);
                break;
            default: 
                usage("./rw");
                exit(EXIT_FAILURE);
        }
    }



    pthread_t* reader_idx = (pthread_t *) malloc(sizeof(pthread_t) * READ_THREADS);     /* holds thread IDs of readers */
    pthread_t* writer_idx  = (pthread_t *) malloc(sizeof(pthread_t) * WRITE_THREADS);       /* holds thread IDs of writers */

    /* create readers */
    for (i = 0;i < READ_THREADS;i++) 
        {
            seed = (unsigned int) time(&t);
            if((pthread_create(&reader_idx[i], NULL, reader_thr, &seed)) != 0)
            {
                perror("pthread reader create");
                exit(-1);
            }
        }
    printf("Done creating reader threads!\n");

    /* create writers */ 
    for (i = 0;i < WRITE_THREADS;i++) 
    {
        seed = (unsigned int) time(&t);
        /* YOUR CODE GOES HERE */
        if((pthread_create(&writer_idx[i], NULL, writer_thr, &seed)) != 0)
        {
            perror("pthread writer create");
            exit(-1);
        }

    }
    printf("Done creating writer threads!\n");

    /* Join all reader and writer threads. 
    */
    for(i = 0; i < READ_THREADS; i++)
    {
        pthread_join(reader_idx[i], NULL);
    }
    for(i = 0; i < WRITE_THREADS; i++)
    {
        pthread_join(writer_idx[i], NULL);
    }

    printf("Reader threads joined.\n");

    printf("Writer threads joined.\n");

    pthread_mutex_destroy(&r_lock);
    pthread_mutex_destroy(&rw_lock);

    return 0;
}

你的代码一团糟。它有几处错误,每一个都会破坏您尝试实现的 RW 锁定机制。

  • 您的 reader 线程和编写器线程都需要处理 reader 排除和编写器排除。您当前的代码完全忽略了编写器线程中的 reader 排除项。
  • 您的编写器线程正在从共享结构 (if (account_list[i].accno == update_acc[j].accno)) 中读取而不排除其他编写器。
  • 我不认为这可以像您试图做的那样只用互斥体来实现。例如,临界区中的最后 reader 个线程需要能够让等待的写入者离开。您可能至少需要条件变量或信号量来执行此操作。

我的建议是使用 POSIX pthread_rwlock_init 和朋友代替。

如果您坚持要自己这样做,那么请至少阅读这篇 Concurrent Control with "Readers" and "Writers" 论文以获取有关如何实施的灵感。