如何实现多个线程多次加入一个进程? (pthread_mutex_lock)

How can I accomplish multiple threads to join multiple times into a process? (pthread_mutex_lock)

我想写一个程序,什么算到 100。我想用 10 个线程来完成这个,使用 pthread 锁。当程序进入一个线程时,它会生成一个 0-2 之间的数字,这个值将被添加到它的数组索引以及全局变量 sum 中。当 sum 值达到 100 时,每个线程都应该打印自己的数组值(整个数组的数量应该与 sum 变量相等)。

我的问题如下:总是第一个线程锁定互斥变量,但我想在所有线程之间分配任务(arr[1] = 100,每个其他 = 0,但我想要例如 arr[ 1] = 14,arr[2] = 8,以此类推,直到 100)。我哪里错了?

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>

pthread_mutex_t mutex;
int arr[10];
int sum = 0;

void *add (void* input) {

    int *id = (int*) input, x, s;
    int ind = *id;

    while (sum < 100) {
        while (1) {
            if (pthread_mutex_trylock(&mutex) != EBUSY)
                break;
        }

        if (sum < 100) {
            x = rand() % 3;
            arr[ind] = arr[ind] + x;
            sum += x;
        }

        pthread_mutex_unlock(&mutex);
        sleep(0.1);
    }

    printf("The %d. thread got %d points!\n", ind, arr[ind]);
    return NULL;
}

int main (void) {

    int i;
    pthread_t threads[10];
    pthread_mutex_init(&mutex, NULL);
    srand(time(NULL));

    for (i = 0; i < 10; i++) {
        if (pthread_create(&threads[i], NULL, add, &i)){
            perror("pthread_create");
            exit(1);
        }
    }

    for (i = 0; i < 10; i++) {
        if (pthread_join (threads[i], NULL)){
            perror("pthread_join");
        }
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}

输出:

The 1. thread got 100 points!
The 2. thread got 0 points!
The 3. thread got 0 points!
The 4. thread got 0 points!
The 5. thread got 0 points!
The 6. thread got 0 points!
The 7. thread got 0 points!
The 8. thread got 0 points!
The 9. thread got 0 points!
The 0. thread got 0 points!

我发现了两个问题。

当我在 Windows 上用 MSys2 尝试这个时,我的输出是

The 1. thread got 101 points!
The 1. thread got 101 points!
The 2. thread got 0 points!
The 3. thread got 0 points!
The 5. thread got 0 points!
The 6. thread got 0 points!
The 6. thread got 0 points!
The 8. thread got 0 points!
The 9. thread got 0 points!
The 0. thread got 0 points!

您将循环计数器的地址 i 传递给线程。这个变量不是常量,这就是线程可能读取已经改变的值的原因。为了解决这个问题,我创建了一个数组 int ids[10] 并向每个线程传递了它自己的 ID 数组元素。

函数 sleep 使用了一个 unsigned int 参数,因此您有效地调用了 sleep(0)。使用 sleep(1) 程序运行速度较慢但按预期工作。如果你想睡几分之一秒,你可以使用 usleep

修改程序:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>

pthread_mutex_t mutex;
int arr[10];
int sum = 0;

void *add (void* input){

    int *id = (int*) input, x, s;
    int ind = *id;
    while (sum < 100){
        while (1){
            if (pthread_mutex_trylock(&mutex) != EBUSY)
                break;
        }
        if (sum < 100){
            x = rand() % 3;
            arr[ind] = arr[ind] + x;
            sum += x;
        }
        pthread_mutex_unlock(&mutex);
        usleep(100000);
    }
    printf("The %d. thread got %d points!\n", ind, arr[ind]);
    return NULL;
}

int main (void){

    int i;
    pthread_t threads[10];
    int ids[10];
    pthread_mutex_init(&mutex, NULL);
    srand(time(NULL));
    for (i = 0; i < 10; i++){
        ids[i] = i;
        if (pthread_create(&threads[i], NULL, add, &ids[i])){
            perror("pthread_create");
            exit(1);
        }
    }
    for (i = 0; i < 10; i++){
        if (pthread_join (threads[i], NULL)){
            perror("pthread_join");
        }
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}

您的代码中有几个问题,与您询问的具体问题有不同程度的相关性。在我们继续之前,一个输出 I 从 运行 得到你写的代码可能是说明性的:

The 2. thread got 100 points!
The 8. thread got 0 points!
The 9. thread got 0 points!
The 5. thread got 0 points!
The 5. thread got 0 points!
The 6. thread got 0 points!
The 7. thread got 0 points!
The 2. thread got 100 points!
The 4. thread got 0 points!
The 1. thread got 0 points!

请注意,某些线程获得了相同的计数器索引。发生这种情况是因为主线程和每个子线程之间存在数据竞争:主线程将指针传递给其局部变量 i,然后继续修改该变量的值。同时子线程通过指针读取变量的值。这些操作不同步,因此行为未定义。

这个特殊问题有几个解决方案。最简单的可能是 cast ivoid * 并传递结果(按值)。这实际上很常见,您只想传递一个整数:

        if (pthread_create(&threads[i], NULL, add, (void *)i)) {
            // ...

当然需要线程函数再转换回来:

void *add (void* input) {
    int /*no need for 'id' */ x, s;
    int ind = (int) input;
    // ...

接下来,观察您在所有子线程中有 另一个 数据竞争,您在 while 的条件下读取 sum 的值环形。那个家伙此刻似乎并没有咬你,但它随时都可能咬你。由于线程作为一个组同时读取和修改 sum,因此您必须确保所有此类访问都是同步的——例如,仅在锁定互斥量时执行它们。

跳过一点(我们会回来的),您的 sleep() 通话有问题。该函数的参数是 int 类型,因此您的实际参数 double 0.1 被转换为 int,产生 0。编译器可能会完全优化它,如果它没有t,那么它可能实际上是一个空操作。 然而,更重要的是sleep() 是这项工作的错误工具,或者几乎所有与线程间同步和计时相关的工作。

现在认识到没有休眠,您应该看到外部 while 循环非常紧凑,因为刚刚解锁互斥锁的线程将立即尝试再次锁定它。这种重新锁定尝试在第一次尝试时就成功的情况很常见,因为默认的互斥行为不保证线程调度的公平性。

此外,pthread_mutex_trylock() 周围的特殊繁忙循环对您没有任何好处。只需使用 pthread_mutex_lock() 即可,否则当互斥体繁忙时您要做的就是重试。这将为您节省 CPU 时间,并且不会有明显不同的语义。

总的来说,你的策略有问题。互斥通常不提供任何关于公平调度的保证。通常对于像这样的紧密多线程交互,您需要手动执行某种调度管理,而这通常会从向组合中添加条件变量中受益匪浅。

看起来您不想强制线程轮流,但也许足以确保刚刚 运行 的线程不会立即再次被选中。在这种情况下,您可以添加一个共享变量来记录刚刚 运行 的线程的索引,并使用它来防止它再次被选为下一个线程。从示意图上看,线程函数可能看起来像这样

  • 无限循环:
    • 锁定互斥体
    • 无限循环:
      • 将最后一个线程索引与我的索引进行比较
      • 如果不同,则从(内部)循环中断
      • 否则等待条件变量
    • 将我的索引设置为最后一个线程索引
    • (仍然保持互斥锁锁定)广播或向 CV
    • 发送信号
    • 如果sum小于100
      • 更新sum
      • 解锁互斥体
    • 否则(sum >= 100)
      • 解锁互斥体
      • 脱离(外)循环