简单的 C pthread 测试程序在执行期间挂起

Simple C pthread test program hangs during execution

我刚开始使用 C 中的 pthread 库,我有一项任务要 class 使用它们编写一个简单的程序。该程序的基本描述是它需要 1 个或多个包含网站名称的输入文件和 1 个输出文件名。然后我需要为每个输入文件创建 1 个线程来读取网站名称并将它们推送到队列中。然后我需要创建几个线程将这些名称从队列中拉出,找到它们的 IP 地址,然后将该信息写入输出文件。预期命令行参数如下:

./multi-lookup [one or more input files] [single output file name]

我的问题是这样的。每当我 运行 只有 1 个线程的程序将信息推送到输出文件时,一切正常。当我让它成为两个线程时,程序挂起,甚至打印了我的测试 "printf" 语句的 none。我最好的猜测是死锁以某种方式发生,我没有正确使用我的互斥体,但我不知道如何修复它。请帮忙!

如果您需要任何我没有提供的信息,请告诉我。很抱歉代码中缺少注释。

#include <stdlib.h>
#include <stdio.h> 
#include <string.h>
#include <errno.h>
#include <pthread.h>

#include "util.h"
#include "queue.h"

#define STRING_SIZE 1025
#define INPUTFS "%1024s"
#define USAGE "<inputFilePath> <outputFilePath>"
#define NUM_RESOLVERS 2

queue q;
pthread_mutex_t locks[2];
int requestors_finished;

void* requestors(void* input_file);
void* resolvers(void* output_file);

int main(int argc, char* argv[])
{
    FILE* inputfp = NULL;
    FILE* outputfp = NULL;
    char errorstr[STRING_SIZE];
    pthread_t requestor_threads[argc - 2];
    pthread_t resolver_threads[NUM_RESOLVERS];
    int return_code;
    requestors_finished = 0;

    if(queue_init(&q, 10) == QUEUE_FAILURE)
        fprintf(stderr, "Error: queue_init failed!\n");

    if(argc < 3)
    {
        fprintf(stderr, "Not enough arguments: %d\n", (argc - 1));
        fprintf(stderr, "Usage:\n %s %s\n", argv[0], USAGE);
        return 1;
    }

    pthread_mutex_init(&locks[0], NULL);
    pthread_mutex_init(&locks[1], NULL);

    int i;
    for(i = 0; i < (argc - 2); i++)
    {
        inputfp = fopen(argv[i+1], "r");
        if(!inputfp)
        {
            sprintf(errorstr, "Error Opening Input File: %s", argv[i]);
            perror(errorstr);
            break;
        }

        return_code = pthread_create(&(requestor_threads[i]), NULL,    requestors, inputfp);
        if(return_code)
        {
            printf("ERROR: return code from pthread_create() is %d\n", return_code);
            exit(1);
        }
    }

    outputfp = fopen(argv[i+1], "w");
    if(!outputfp)
    {
        sprintf(errorstr, "Errord opening Output File: %s", argv[i+1]);
        perror(errorstr);
        exit(1);
    }

    for(i = 0; i < NUM_RESOLVERS; i++)
    {
        return_code = pthread_create(&(resolver_threads[i]), NULL, resolvers, outputfp);
        if(return_code)
        {
            printf("ERROR: return code from pthread_create() is %d\n", return_code);
            exit(1);
        }

    }

    for(i = 0; i < (argc - 2); i++)
        pthread_join(requestor_threads[i], NULL);

    requestors_finished = 1;

    for(i = 0; i < NUM_RESOLVERS; i++)
        pthread_join(resolver_threads[i], NULL);

    pthread_mutex_destroy(&locks[0]);
    pthread_mutex_destroy(&locks[1]);

    return 0;
}



void* requestors(void* input_file)
{
    char* hostname = (char*) malloc(STRING_SIZE);
    FILE* input = input_file;

    while(fscanf(input, INPUTFS, hostname) > 0)
    {
        while(queue_is_full(&q))
            usleep((rand()%100));

        if(!queue_is_full(&q))
        {
            pthread_mutex_lock(&locks[0]);
            if(queue_push(&q, (void*)hostname) == QUEUE_FAILURE)
                fprintf(stderr, "Error: queue_push failed on %s\n", hostname);
            pthread_mutex_unlock(&locks[0]);
        }
        hostname = (char*) malloc(STRING_SIZE);
    }
    printf("%d\n", queue_is_full(&q));
    free(hostname);
    fclose(input);
    pthread_exit(NULL);
}



void* resolvers(void* output_file)
{
    char* hostname;
    char ipstr[INET6_ADDRSTRLEN];
    FILE* output = output_file;
    int is_empty = queue_is_empty(&q);

    //while(!queue_is_empty(&q) && !requestors_finished)
    while((!requestors_finished) || (!is_empty))
    {
        while(is_empty)
            usleep((rand()%100));
        pthread_mutex_lock(&locks[0]);
        hostname = (char*) queue_pop(&q);
        pthread_mutex_unlock(&locks[0]);

        if(dnslookup(hostname, ipstr, sizeof(ipstr)) == UTIL_FAILURE)
        {
            fprintf(stderr, "DNSlookup error: %s\n", hostname);
            strncpy(ipstr, "", sizeof(ipstr));
        }

        pthread_mutex_lock(&locks[1]);
        fprintf(output, "%s,%s\n", hostname, ipstr);
        pthread_mutex_unlock(&locks[1]);

        free(hostname);
        is_empty = queue_is_empty(&q);
    }
    pthread_exit(NULL);
}

虽然我对你的"queue.h"库不熟悉,但你需要注意以下几点:

当您检查队列是否为空时,您并没有获取互斥量,这意味着可能会发生以下情况:

  • 一些 requestors 线程检查是否为空(我们称它为 thread1)并在它执行之前 pthread_mutex_lock(&locks[0]); (以及 if(!queue_is_full(&q)) 之后)thread1 被切换上下文
  • 其他 requestors 线程填满了队列,当线程 1 最终获得互斥量时,如果将尝试插入到完整队列。现在,如果您的队列实现在尝试将更多元素插入已经满的队列时崩溃,线程 1 将永远不会解锁互斥锁,您将遇到死锁。

另一种情况:

  • 一些 resolver 线程 运行 的第一个 requestors_finished 最初是 0 所以 (!requestors_finished) || (!is_empty) 最初是正确的。
  • 但是因为队列还是空的is_empty是真的。
  • 这个线程将达到 while(is_empty) usleep((rand()%100)); 并永远休眠,因为你 pthread_join 这个线程你的程序将永远不会终止,因为这个值永远不会在循环中更新。

要记住的一般想法是,当您访问某些非原子资源并且可能被其他线程访问时,您需要确保您是唯一一个对此资源执行操作的人。

使用互斥锁是可以的,但你应该考虑到你无法预料何时会发生上下文切换,所以如果你想检查例如队列是否为空,你应该在锁定互斥锁而不是解锁它的同时执行此操作直到你完成它,否则不能保证在下一行执行时它会保持为空。

您可能还想考虑阅读有关 the consumer producer problem 的更多信息。

为了帮助您了解(和控制)消费者resolver)线程应该运行以及生产者 线程产生你应该考虑使用 conditional variables


一些杂项。东西:

  • pthread_t requestor_threads[argc - 2]; 正在使用 VLA 并且不是以一种好的方式 - 想想如果我没有给你的程序任何参数会发生什么。要么决定一些最大值并define它,要么在检查输入的有效性后动态创建它。
  • 恕我直言 requestors 线程应该自己打开文件

可能还有更多问题,但请先解决这些问题。