Pthread、屏障和共享数组的意外行为

Unexpected Behavior With Pthread, Barriers, and Sharing Array

我有一个使用 MPI + Pthread 的程序。我坚持实施 pthreads 以共享 read/writes 的数组。我在这里制作了模拟代码,模拟了这个问题。

#include <iostream>
#include <unistd.h>
#include <pthread.h>

struct args {
    double* array;
    int start;
    int stop;
    double myVal;
    double* row;
    pthread_barrier_t* barrier;
};

void* mythread(void* arguments){
    struct args* args_ = (struct args*)arguments;
    double* array = args_->array;
    int start = args_->start;
    int stop = args_->stop;
    double myVal = args_->myVal;
    pthread_barrier_t* barrier = args_->barrier;
    double* row = args_->row;

    for(int i = start; i < stop; i++){
        pthread_barrier_wait(barrier);
        for(int j = 0; j < 10; j++){
            double a = row[j];
            int ind = i*10 + j;
            array[ind] = a + myVal;
        }
    }
}


int main(){

    pthread_t threads[50];
    int start_ = 0;
    double* array_0 = NULL;
    array_0 = new double[100*10];
    double* row = NULL;
    row = new double[10];
    pthread_barrier_t barrier;
    (void)pthread_barrier_init(&barrier, NULL, 50+1);

    for(int n = 0; n < 50; n++){
        struct args args_;
        args_.start = start_;
        args_.stop = start_ + 2;
        start_ = start_ + 2;
        args_.array = &array_0[0];
        args_.myVal = n;
        args_.row = row;
        args_.barrier = &barrier;
        (void)pthread_create(&threads[n], NULL, mythread, (void*)&args_);
    }

    for(int i = 0; i < 2; i++){
        for(int k = 0; k < 10; k++){
            row[k] = i+1;
        }
        // usleep(100);
        pthread_barrier_wait(&barrier);
    }

    for(int n = 0; n < 50; n++){
        (void)pthread_join(threads[n], NULL);
    }
    

    // print
    for(int i = 0; i < 100; i++){
        for(int j = 0; j < 10; j++){
            int ind = i*10 + j;
            std::cout << " " << array_0[ind];
        }
        std::cout << std::endl;
    }
    return 0;
}

主要生成 50 个线程。 Barrier 初始化为 50 + 1(包括主线程)。这应该同步 pthread_barrier_wait() 调用上的所有 51 个线程,但阻塞等待调用似乎不允许“行”数组写入循环在释放之前完成。

预期结果应该是:

1 1 1 1 1 1 1 1 1 1
2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2
3 3 3 3 3 3 3 3 3 3
3 3 3 3 3 3 3 3 3 3
4 4 4 4 4 4 4 4 4 4
4 4 4 4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 5 5
5 5 5 5 5 5 5 5 5 5
.
.
.
.
.
.
etc.

实际输出是半随机的。它在某些线程中完成了序列,而在其他线程中它显示了零,就好像“行”从未被填满一样。在写入“行”数组后添加 usleep() 也无济于事——并不是说我睡得起在我的代码中运行。这让我相信我不明白指针数组是如何在线程之间正确共享的。我是 C++ 的新手,所以感谢您的帮助。

在您的循环中,您创建了一个 struct args 对象,然后将此对象的地址传递给 pthread_create。然后在循环迭代结束时立即“销毁”该对象,并在下一次迭代时创建一个新对象,但是,新创建的线程仍然引用这个旧的“销毁”对象。

您需要确保传递给 pthread_create 的对象持续足够长的时间:

  1. 创建自己副本的线程
  2. 要完成的线程

作为一种非常简单的方法,您可以将 _args 的声明移到循环之外并将其转换为这样的数组:

struct args args_[50];

for(int n = 0; n < 50; n++){
    args_[n].start = start_;
    args_[n].stop = start_ + 2;
    start_ = start_ + 2;
    args_[n].array = &array_0[0];
    args_[n].myVal = n;
    args_[n].row = row;
    args_[n].barrier = &barrier;
    (void)pthread_create(&threads[n], NULL, mythread, (void*)&args_[n]);
}

args_[] 的生命周期现在比每个线程都长。或者,您可以动态分配 struct args(例如使用 new)并在线程中使用对象(例如使用 delete)。或者,如果您使用的是 C++11 或更高版本,则可以使用 std::shared_ptrstd::thread,有一些文档介绍了这两者之间的行为方式 here.