Posix 线程消费者无法正确打印

Posix thread consumer's get not printing properly

我的程序应该使用生产者线程将文件中的行读入缓冲区,一次最多容纳 numlines,然后使用消费者线程将文件中的单词添加到链表中根据单词的字符数是奇数还是偶数,如果单词不存在则将其附加到列表中,如果单词已经在列表中则将其添加到计数中。输出将是奇数列表和偶数列表,其中包含单词以及它们在文件中出现的频率。我在 Mint 运行 Linux

该程序的用法类似于./exec -b numlines -t maxcounters -d filedelay -D threaddelay file,所以我现在使用./exec -b 4 -t 1 -d 0 -D 0 file。当带有附加文件的程序时,我看到我的生产者线程正确地放入了文件的 4 行,但是消费者的 get 似乎每次都 return 不同的输出。我对 pthreads 很陌生,有人可以告诉我我的 get 有什么问题吗?

#define _GNU_SOURCE 
#include <features.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <limits.h>
#include <time.h>

//globals
int numlines;
int maxcounters;
int filedelay;
int threaddelay;
#define MAXSIZE 2048
#define OVER (-1)
char* lastLine;

/* Circular buffer of integers. */
struct prodcons
{
  char** data;  /* the actual data */
  pthread_mutex_t lock;     /* mutex ensuring exclusive access to buffer */
  int readpos, writepos;    /* positions for reading and writing */
  pthread_cond_t notempty;  /* signaled when buffer is not empty */
  pthread_cond_t notfull;   /* signaled when buffer is not full */
};

//linked lists for odd and even
struct node  
{
  char *word;
  int timesAppeared;
  struct node *next;
};

int checkargs(int argc, char* argv[]);
static void init (struct prodcons *b);
static void put (struct prodcons *b, char* line);
static char *get (struct prodcons *b);
static void *producer (void *args);
static void *consumer (void *args);
static void addWord (char *word, struct node **head);

//buffer and lists
struct prodcons buffer;
struct node *odd = NULL;
struct node *even = NULL;

//mutexes for list
pthread_mutex_t oddlock;
pthread_mutex_t evenlock;

int main(int argc, char* argv[]){
    if (checkargs(argc, argv) == 1){
        printf("CORRECT ARGS!\n");
    }
    else{
        exit(1);
    }
    int filecount;
    filecount = argc - 9;
    pthread_t th_a, th_b;
    void *retval;
    init(&buffer);

    //create threads
    if(pthread_create (&th_a, NULL, producer, argv) != 0){
        fprintf(stderr, "Error: producer create problem\n");
        exit(1);
    }
    if(pthread_create (&th_b, NULL, consumer, argv) != 0){
        fprintf(stderr, "Error: consumer create problem\n");
        exit(1);
    }
    /* Wait until producer and consumer finish. */
    if(pthread_join(th_a, &retval) != 0){
        fprintf(stderr, "ERROR: producer join problem\n");
        exit(1);
    }
    printf("hey121342343\n");
    if(pthread_join(th_b, &retval) != 0){
        fprintf(stderr, "ERROR: consumer join problem\n");
        exit(1);
    }   
    printf("hey123\n");
    struct node *ptr;
    printf("\nODD LIST BEGIN\n");
    printf("%s %s\n", "Word", "# appeared");
    for( ptr = odd; ptr != NULL; ptr = ptr->next){
        printf("%s %d\n", ptr->word, ptr->timesAppeared);
    }

    printf("\nEVEN LIST BEGIN\n");
    printf("%s %s\n", "Word", "# appeared");
    for( ptr = even; ptr != NULL; ptr = ptr->next){
        printf("%s %d\n", ptr->word, ptr->timesAppeared);
    }

    return 0;
} 

static void addWord (char* word, struct node **head){


    struct node *headnode;
    struct node *addnode;
    struct node *lastnode;

    //check for word
    for(headnode = *head; headnode != NULL; headnode = headnode->next){
        if(strcmp(headnode->word, word) == 0){
            //same
            printf("SAME WORD!\n");
            headnode -> timesAppeared++;
            return;
        }
    }

    //not in list? make a new node
    addnode = (struct node *) malloc(sizeof(struct node));
    addnode -> word = malloc(strlen(word) + 1);
    strcpy(addnode->word, word);
    printf("the word %s has been added!\n", addnode->word);
    addnode-> timesAppeared = 1;
    addnode->next = NULL;

    //insert into linked list
    headnode = *head;
    lastnode = NULL;

//possible debug, try taking out the strcmp condition
//(strcmp(addnode->word, headnode->word) > 0)
    //traverse to end
    while(( headnode != NULL)){
        lastnode = headnode;
        headnode = headnode->next;
    }
    addnode->next = headnode;

    //in case of empty list
    if(lastnode == NULL){
        *head = addnode;
    }
    else{
        lastnode ->next = addnode;
    }
    return;
}

/* Initialize a buffer */
static void init (struct prodcons *b)
{
    //make a buffer of correct size and check
    b->data = malloc(sizeof(char *) * numlines);
    if (b->data == NULL){
        perror("failed buffer malloc");
        exit(1);
    }
    pthread_mutex_init (&b->lock, NULL);
    pthread_cond_init (&b->notempty, NULL);
    pthread_cond_init (&b->notfull, NULL);
    b->readpos = 0;
    b->writepos = 0;
}

/* Store an integer in the buffer */
static void put (struct prodcons *b, char* line)
{
    pthread_mutex_lock (&b->lock);
    /* Wait until buffer is not full */
    while ((b->writepos + 1) % numlines == b->readpos)
    {
      pthread_cond_wait (&b->notfull, &b->lock);
      /* pthread_cond_wait reacquired b->lock before returning */
    }
    /* Write the data and advance write pointer */
    printf("THIS IS THE READLINE in PUT %s\n", line);
    b->data[b->writepos] = line;
    printf("AFTER PUT command: %s is at WRITEPOS %d\n", b->data[b->writepos], b->writepos);
    b->writepos++;
    if (b->writepos >= numlines){
        b->writepos = 0;
    }


    /* Signal that the buffer is now not empty */
    pthread_cond_signal (&b->notempty);
    pthread_mutex_unlock (&b->lock);
}


/* Read and remove an integer from the buffer */
static char* get (struct prodcons *b)
{
    char* line;
    pthread_mutex_lock (&b->lock);

    /* Wait until buffer is not empty */
    while (b->writepos == b->readpos)
    {
      pthread_cond_wait (&b->notempty, &b->lock);
    }
    /* Read the data and advance read pointer */
    printf("THIS IS THE LINE in GET%s\n", b->data[b->readpos]);
    line = b->data[b->readpos];
    printf("AFTER GET command: %s is at READPOS %d\n", b->data[b->readpos], b->readpos);
    b->readpos++;
    if (b->readpos >= numlines){
        b->readpos = 0;
    }

    /* Signal that the buffer is now not full */
    pthread_cond_signal (&b->notfull);
    pthread_mutex_unlock (&b->lock);

    return line;
}

/* A test program: one thread inserts integers from 1 to 10000,
   the other reads them and prints them. */



static void *producer (void *args)
{
    FILE *file;
    char readline[MAXSIZE];
    char **argv = args;
    int filePos = 9;

    //go through every file
    while (argv[filePos] != NULL){
        printf("This is the file opened: %s", argv[filePos]);
        file = fopen(argv[filePos], "r");

        //read file lines
        while(fgets(readline, MAXSIZE, file) == readline){

            printf("THIS IS THE READLINE in PRODUCER%s", readline);
            put(&buffer, readline);

        }
        // lastLine = &readline;
        // printf("THIS IS THE LAST READLINE: %s\n", lastLine);

        fclose(file);
        filePos+= 1;
    }//endwhile
    // need a way for the consumer to know when the thread has finished
    put(&buffer, "endofile");
    // int a;
    // for(a = 0; a < ccount; a++){
    //  put(NULL);
    // }

  printf("-------------PRODUCER FINISHED FILLING BUFFER-------------\n");
    return NULL;  
}

static void *consumer (void *args)
{
    char name[2] = "a";
    char *removedLine;
    char* token;
    //get line
    removedLine = get(&buffer);
    while(removedLine != "endofile"){
        printf("This is the removedline: %s", removedLine);
        token = strtok_r(removedLine, " ", &removedLine);
        printf("This is the TOKEN %s and its length: %zu\n", token, strlen(token));
        //process each word
        while(token != NULL){

            //odd
            if( (strlen(token) - 1) % 2 == 1){
                pthread_mutex_lock(&oddlock);
                printf("adding the word in odd !\n");
                addWord(token, &odd);
                pthread_mutex_unlock(&oddlock);
            }
            //even
            if( (strlen(token) - 1) % 2 == 0){
                pthread_mutex_lock(&evenlock);
                printf("adding the word in even!\n");
                addWord(token, &even);
                pthread_mutex_unlock(&evenlock);
            }
            //subsequent call, has to make the first arg null from the man page
            token = strtok_r(NULL, " ", &removedLine);
        }//inner while
        // printf("This is thread A!\n");
        removedLine = get(&buffer);
        printf("This is the next line, after the while: %s", removedLine);
    }//outer while
    printf("-------------CONSUMER FINISHED FILLING BUFFER-------------\n");

    return;
}
// //need to make 2 lists for odd and even
// struct linkedlist{
//   struct node *start;
//   struct node *end;
// };

// struct list odd;
// struct list even;

int checkargs(int argc, char* argv[]){
    //have checkers to see if the program went through all the args properly 
    int checknumlines;
    int checkmaxcounters;
    int checkfiledelay;
    int checkthreaddelay;
    printf("ARGC %d\n", argc);
    //10 args or more (depending on files)
    if(argc <= 9){
        fprintf(stderr, "Error: Please enter correct number of args. \n");
        return 0;
    }
    //grab data
    int i;

    for(i = 0; i < (argc - 1); i++){
        //travel through until -b, get that info of the next
        if(strcmp(argv[i], "-b") == 0){
            numlines = atoi(argv[i+1]);
            if (numlines <= 0){
                fprintf(stderr, "Error: numlines is not a positive non-zero integer. \n");
                return 0;
            }
            checknumlines = -1;
        }
        //travel through until -t, get that info of the next
        if(strcmp(argv[i], "-t") == 0){
            maxcounters = atoi(argv[i +1]);
            if((maxcounters <= 0) || (maxcounters > 26)){
                fprintf(stderr, "Error: maxcounters needs to be a positive non-zero integer less than or equal to 26. \n");
                return 0;
            }
            checkmaxcounters = -1;
        }

        //travel through until -d, get that info of the next
        if(strcmp(argv[i], "-d") == 0){
            filedelay = atoi(argv[i+1]);
            if(filedelay < 0 ){
                fprintf(stderr, "Error: filedelay needs to be a positive integer. \n");
                return 0;
            }
            checkfiledelay = -1;
        }

        //travel through until -D, get that info of the next
        if(strcmp(argv[i], "-D") == 0){
            threaddelay = atoi(argv[i+1]);
            if(threaddelay < 0 ){
                fprintf(stderr, "Error: threaddelay needs to be a positive integer. \n");
                return 0;
            }
            checkthreaddelay = -1;
        }
    }
    printf("CHECKS: This is numlines: %d, This is maxcounters: %d, This is filedelay: %d, This is threaddelay: %d \n", checknumlines, checkmaxcounters, checkfiledelay, checkthreaddelay);

    if(checknumlines != -1 || checkmaxcounters != -1|| checkfiledelay != -1|| checkthreaddelay!= -1){
        fprintf(stderr, "Error: incorrect args.\n");
        return 0;
    }

    printf("This is numlines: %d, This is maxcounters: %d, This is filedelay: %d, This is threaddelay: %d \n", numlines, maxcounters, filedelay, threaddelay);


    return 1;
}

我用来检查的文件:

a
aa
aaa
aaaa
aaaaa
aaaaaa
aaaaaaa
aaaaaaaa
aaaaaaaaa
aaaaaaaaaa
aaaaaaaaaaa
aaaaaaaaaaaa
aaaaaaaaaaaaa
aaaaaaaaaaaaaa
aaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaa
aaaaaaaaaaaaaaaaa

3 次运行后的输出(减去调试语句) 1.

 ODD LIST BEGIN
    Word # appeared
    aaaaa
     3
    aaaaaaaaa
     1
    aaaaaaaaaaa
     1
    a
     1
    aaaaaaaaaaaaa
     2
    aaaaaaaaaaaaaaa
     1
    aaaaaaaaaaaaaaaaa
     4

    EVEN LIST BEGIN
    Word # appeared
    aaaaa
     1

     2
    aaaaaa
     1
    aaaaaaaaaaaa
     2
    aaaaaaaaaaaaaaaa
     1

2.

ODD LIST BEGIN
Word # appeared
aaaaa
 4
aaaaaaaaa
 3
aaaaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaaaa
 4

EVEN LIST BEGIN
Word # appeared

 3
aaaaaaaa
 1
aaaaaaaaaaaa
 1
aaaaaaaaaaaaaa
 2
aaaaaaaaaaaaaaaa
 1

3.

ODD LIST BEGIN
Word # appeared
aaaaa
 2
aaaaaaa
 1
aaaaaaaaa
 1
aaaaaaaaaaa
 1
aaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaaaa
 4

EVEN LIST BEGIN
Word # appeared

 2
aaaaaa
 1
aaaaaaaa
 1
aaaaaaaaaa
 1
aaaaaaaaaaaa
 1
aaaaaaaaaaaaaa
 1
aaaaaaaaaaaaaaaa
 2

感谢您的宝贵时间!我试图尽可能地描述性,让我知道这是否不好(Whosebug 的新手)

您向缓冲区添加行的方式不正确:

static void *producer (void *args)
{
    char readline[MAXSIZE];
    // ...
        while(fgets(readline, MAXSIZE, file) == readline){

            printf("THIS IS THE READLINE in PRODUCER%s", readline);
            put(&buffer, readline);
        }
    // ...
}

请注意,您正在将行读入局部变量 readline,并将 readline 添加到缓冲区。然后你循环并用下一行覆盖 readline 的内容。因此,之前添加到缓冲区的内容将被覆盖。您需要做的是复制每一行。例如,如果您有一个 strdup() 函数:

        while(fgets(readline, MAXSIZE, file) == readline){

            printf("THIS IS THE READLINE in PRODUCER%s", readline);
            put(&buffer, strdup(readline));
        }

现在您添加到缓冲区的每个字符串都是分配的副本。您需要记得在使用后释放此副本。