Pthread、屏障和共享数组的意外行为
Unexpected Behavior With Pthread, Barriers, and Sharing Array
我有一个使用 MPI + Pthread 的程序。我坚持实施 pthreads 以共享 read/writes 的数组。我在这里制作了模拟代码,模拟了这个问题。
#include <iostream>
#include <unistd.h>
#include <pthread.h>
struct args {
double* array;
int start;
int stop;
double myVal;
double* row;
pthread_barrier_t* barrier;
};
void* mythread(void* arguments){
struct args* args_ = (struct args*)arguments;
double* array = args_->array;
int start = args_->start;
int stop = args_->stop;
double myVal = args_->myVal;
pthread_barrier_t* barrier = args_->barrier;
double* row = args_->row;
for(int i = start; i < stop; i++){
pthread_barrier_wait(barrier);
for(int j = 0; j < 10; j++){
double a = row[j];
int ind = i*10 + j;
array[ind] = a + myVal;
}
}
}
int main(){
pthread_t threads[50];
int start_ = 0;
double* array_0 = NULL;
array_0 = new double[100*10];
double* row = NULL;
row = new double[10];
pthread_barrier_t barrier;
(void)pthread_barrier_init(&barrier, NULL, 50+1);
for(int n = 0; n < 50; n++){
struct args args_;
args_.start = start_;
args_.stop = start_ + 2;
start_ = start_ + 2;
args_.array = &array_0[0];
args_.myVal = n;
args_.row = row;
args_.barrier = &barrier;
(void)pthread_create(&threads[n], NULL, mythread, (void*)&args_);
}
for(int i = 0; i < 2; i++){
for(int k = 0; k < 10; k++){
row[k] = i+1;
}
// usleep(100);
pthread_barrier_wait(&barrier);
}
for(int n = 0; n < 50; n++){
(void)pthread_join(threads[n], NULL);
}
// print
for(int i = 0; i < 100; i++){
for(int j = 0; j < 10; j++){
int ind = i*10 + j;
std::cout << " " << array_0[ind];
}
std::cout << std::endl;
}
return 0;
}
主要生成 50 个线程。 Barrier 初始化为 50 + 1(包括主线程)。这应该同步 pthread_barrier_wait() 调用上的所有 51 个线程,但阻塞等待调用似乎不允许“行”数组写入循环在释放之前完成。
预期结果应该是:
1 1 1 1 1 1 1 1 1 1
2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2
3 3 3 3 3 3 3 3 3 3
3 3 3 3 3 3 3 3 3 3
4 4 4 4 4 4 4 4 4 4
4 4 4 4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 5 5
5 5 5 5 5 5 5 5 5 5
.
.
.
.
.
.
etc.
实际输出是半随机的。它在某些线程中完成了序列,而在其他线程中它显示了零,就好像“行”从未被填满一样。在写入“行”数组后添加 usleep() 也无济于事——并不是说我睡得起在我的代码中运行。这让我相信我不明白指针数组是如何在线程之间正确共享的。我是 C++ 的新手,所以感谢您的帮助。
在您的循环中,您创建了一个 struct args
对象,然后将此对象的地址传递给 pthread_create
。然后在循环迭代结束时立即“销毁”该对象,并在下一次迭代时创建一个新对象,但是,新创建的线程仍然引用这个旧的“销毁”对象。
您需要确保传递给 pthread_create
的对象持续足够长的时间:
- 创建自己副本的线程
- 要完成的线程
作为一种非常简单的方法,您可以将 _args
的声明移到循环之外并将其转换为这样的数组:
struct args args_[50];
for(int n = 0; n < 50; n++){
args_[n].start = start_;
args_[n].stop = start_ + 2;
start_ = start_ + 2;
args_[n].array = &array_0[0];
args_[n].myVal = n;
args_[n].row = row;
args_[n].barrier = &barrier;
(void)pthread_create(&threads[n], NULL, mythread, (void*)&args_[n]);
}
args_[]
的生命周期现在比每个线程都长。或者,您可以动态分配 struct args
(例如使用 new
)并在线程中使用对象(例如使用 delete
)。或者,如果您使用的是 C++11 或更高版本,则可以使用 std::shared_ptr
和 std::thread
,有一些文档介绍了这两者之间的行为方式 here.
我有一个使用 MPI + Pthread 的程序。我坚持实施 pthreads 以共享 read/writes 的数组。我在这里制作了模拟代码,模拟了这个问题。
#include <iostream>
#include <unistd.h>
#include <pthread.h>
struct args {
double* array;
int start;
int stop;
double myVal;
double* row;
pthread_barrier_t* barrier;
};
void* mythread(void* arguments){
struct args* args_ = (struct args*)arguments;
double* array = args_->array;
int start = args_->start;
int stop = args_->stop;
double myVal = args_->myVal;
pthread_barrier_t* barrier = args_->barrier;
double* row = args_->row;
for(int i = start; i < stop; i++){
pthread_barrier_wait(barrier);
for(int j = 0; j < 10; j++){
double a = row[j];
int ind = i*10 + j;
array[ind] = a + myVal;
}
}
}
int main(){
pthread_t threads[50];
int start_ = 0;
double* array_0 = NULL;
array_0 = new double[100*10];
double* row = NULL;
row = new double[10];
pthread_barrier_t barrier;
(void)pthread_barrier_init(&barrier, NULL, 50+1);
for(int n = 0; n < 50; n++){
struct args args_;
args_.start = start_;
args_.stop = start_ + 2;
start_ = start_ + 2;
args_.array = &array_0[0];
args_.myVal = n;
args_.row = row;
args_.barrier = &barrier;
(void)pthread_create(&threads[n], NULL, mythread, (void*)&args_);
}
for(int i = 0; i < 2; i++){
for(int k = 0; k < 10; k++){
row[k] = i+1;
}
// usleep(100);
pthread_barrier_wait(&barrier);
}
for(int n = 0; n < 50; n++){
(void)pthread_join(threads[n], NULL);
}
// print
for(int i = 0; i < 100; i++){
for(int j = 0; j < 10; j++){
int ind = i*10 + j;
std::cout << " " << array_0[ind];
}
std::cout << std::endl;
}
return 0;
}
主要生成 50 个线程。 Barrier 初始化为 50 + 1(包括主线程)。这应该同步 pthread_barrier_wait() 调用上的所有 51 个线程,但阻塞等待调用似乎不允许“行”数组写入循环在释放之前完成。
预期结果应该是:
1 1 1 1 1 1 1 1 1 1
2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2
3 3 3 3 3 3 3 3 3 3
3 3 3 3 3 3 3 3 3 3
4 4 4 4 4 4 4 4 4 4
4 4 4 4 4 4 4 4 4 4
5 5 5 5 5 5 5 5 5 5
5 5 5 5 5 5 5 5 5 5
.
.
.
.
.
.
etc.
实际输出是半随机的。它在某些线程中完成了序列,而在其他线程中它显示了零,就好像“行”从未被填满一样。在写入“行”数组后添加 usleep() 也无济于事——并不是说我睡得起在我的代码中运行。这让我相信我不明白指针数组是如何在线程之间正确共享的。我是 C++ 的新手,所以感谢您的帮助。
在您的循环中,您创建了一个 struct args
对象,然后将此对象的地址传递给 pthread_create
。然后在循环迭代结束时立即“销毁”该对象,并在下一次迭代时创建一个新对象,但是,新创建的线程仍然引用这个旧的“销毁”对象。
您需要确保传递给 pthread_create
的对象持续足够长的时间:
- 创建自己副本的线程
- 要完成的线程
作为一种非常简单的方法,您可以将 _args
的声明移到循环之外并将其转换为这样的数组:
struct args args_[50];
for(int n = 0; n < 50; n++){
args_[n].start = start_;
args_[n].stop = start_ + 2;
start_ = start_ + 2;
args_[n].array = &array_0[0];
args_[n].myVal = n;
args_[n].row = row;
args_[n].barrier = &barrier;
(void)pthread_create(&threads[n], NULL, mythread, (void*)&args_[n]);
}
args_[]
的生命周期现在比每个线程都长。或者,您可以动态分配 struct args
(例如使用 new
)并在线程中使用对象(例如使用 delete
)。或者,如果您使用的是 C++11 或更高版本,则可以使用 std::shared_ptr
和 std::thread
,有一些文档介绍了这两者之间的行为方式 here.