多线程程序每次运行输出不同的结果
Multithreaded program outputs different results every time it runs
我一直在尝试创建一个多线程程序来计算从 1 到 999 的 3 和 5 的倍数,但我似乎每次都无法正确计算 运行 它我得到了不同的值我认为这可能与我使用具有 10 个线程的共享变量这一事实有关,但我不知道如何解决这个问题。此外,如果我计算从 1 到 9 的 3 和 5 的倍数,该程序确实有效。
#include <stdlib.h>
#include <stdio.h>
#include <omp.h>
#include <string.h>
#define NUM_THREADS 10
#define MAX 1000
//finds multiples of 3 and 5 and sums up all of the multiples
int main(int argc, char ** argv)
{
omp_set_num_threads(10);//set number of threads to be used in the parallel loop
unsigned int NUMS[1000] = { 0 };
int j = 0;
#pragma omp parallel
{
int ID = omp_get_thread_num();//get thread ID
int i;
for(i = ID + 1;i < MAX; i+= NUM_THREADS)
{
if( i % 5 == 0 || i % 3 == 0)
{
NUMS[j++] = i;//Store Multiples of 3 and 5 in an array to sum up later
}
}
}
int i = 0;
unsigned int total;
for(i = 0; NUMS[i] != 0; i++)total += NUMS[i];//add up multiples of 3 and 5
printf("Total : %d\n", total);
return 0;
}
"j++" 不是原子操作。
表示"take the value contained at the storage location called j, use it in the current statement, add one to it, then store it back in the same location it came from".
(这是简单的答案。优化以及是否将值保存在寄存器中可以而且将会改变更多。)
当您有多个线程同时对同一个变量执行此操作时,您会得到不同且不可预测的结果。
您可以使用线程变量来解决这个问题。
您遇到的问题是线程不一定按顺序执行,因此最后一个要传输的线程可能没有按顺序读取值,因此您覆盖了错误的数据。
有一种形式可以设置循环中的线程,当它们完成时使用 openmp 选项进行总结。你必须写这样的东西才能使用它。
#pragma omp parallel for reduction(+:sum)
for(k=0;k<num;k++)
{
sum = sum + A[k]*B[k];
}
/* Fin del computo */
gettimeofday(&fin,NULL);
你所要做的就是将结果写入 "sum",这是我用旧代码做的总结。
你的另一个选择是肮脏的。以某种方式,使用对 OS 的调用让线程等待并按顺序排列。这比看起来容易。这将是一个解决方案。
#pragma omp parallel
for(i = ID + 1;i < MAX; i+= NUM_THREADS)
{
printf("asdasdasdasdasdasdasdas");
if( i % 5 == 0 || i % 3 == 0)
{
NUMS[j++] = i;//Store Multiples of 3 and 5 in an array to sum up later
}
}
但我建议您完整阅读 openmp 选项。
这是一个典型的线程同步问题。为了任何所需操作的原子性(在您的情况下增加变量 j 的值),您需要做的就是使用内核同步对象。根据您使用的操作系统,它可以是 mutex、semaphore 或 event 对象在。但是无论你的开发环境是什么,为了提供原子性,基本的流程逻辑应该像下面的伪代码:
{
lock(kernel_object)
// ...
// do your critical operation (increment your variable j in your case)
// ++j;
// ...
unlock(kernel_object)
}
如果您在 Windows 操作系统上工作,环境会提供一些特殊的同步机制(即:InterlockedIncrement 或 CreateCriticalSection 等)如果你在基于 Unix/Linux 的操作系统上工作,你可以使用 mutex 或 semaphore内核同步对象。实际上所有这些同步机制都源于信号量的概念,信号量是由 Edsger W. Dijkstra 在 1960 年代初发明的。
下面是一些基本示例:
Linux
#include <pthread.h>
pthread_mutex_t g_mutexObject = PTHREAD_MUTEX_INITIALIZER;
int main(int argc, char* argv[])
{
// ...
pthread_mutex_lock(&g_mutexObject);
++j; // incrementing j atomically
pthread_mutex_unlock(&g_mutexObject);
// ...
pthread_mutex_destroy(&g_mutexObject);
// ...
exit(EXIT_SUCCESS);
}
Windows
#include <Windows.h>
CRITICAL_SECTION g_csObject;
int main(void)
{
// ...
InitializeCriticalSection(&g_csObject);
// ...
EnterCriticalSection(&g_csObject);
++j; // incrementing j atomically
LeaveCriticalSection(&g_csObject);
// ...
DeleteCriticalSection(&g_csObject);
// ...
exit(EXIT_SUCCESS);
}
或者只是简单地:
#include <Windows.h>
LONG volatile g_j; // our little j must be volatile in here now
int main(void)
{
// ...
InterlockedIncrement(&g_j); // incrementing j atomically
// ...
exit(EXIT_SUCCESS);
}
在您的代码中 j
是共享的 inductive variable。您不能依赖于对多个线程有效地使用共享归纳变量(每次迭代使用 atomic
效率不高)。
您可以找到一个不使用归纳变量的特殊解决方案(例如使用 wheel factorization,15 个中有七个辐条 {0,3,5,6,9,10,12}
),或者您可以找到一个使用私有归纳变量的通用解决方案,如下所示
#pragma omp parallel
{
int k = 0;
unsigned int NUMS_local[MAX] = {0};
#pragma omp for schedule(static) nowait reduction(+:total)
for(i=0; i<MAX; i++) {
if(i%5==0 || i%3==0) {
NUMS_local[k++] = i;
total += i;
}
}
#pragma omp for schedule(static) ordered
for(i=0; i<omp_get_num_threads(); i++) {
#pragma omp ordered
{
memcpy(&NUMS[j], NUMS_local, sizeof *NUMS *k);
j += k;
}
}
}
但是,此解决方案并未充分利用内存。更好的解决方案是使用 C++ 中的 std::vector
之类的东西,你可以在 C 中使用 realloc
来实现,但我不会为你这样做。
编辑:
这是一个特殊的解决方案,它不使用使用轮因式分解的共享归纳变量
int wheel[] = {0,3,5,6,9,10,12};
int n = MAX/15;
#pragma omp parallel for reduction(+:total)
for(int i=0; i<n; i++) {
for(int k=0; k<7; k++) {
NUMS[7*i + k] = 7*i + wheel[k];
total += NUMS[7*i + k];
}
}
//now clean up for MAX not a multiple of 15
int j = n*7;
for(int i=n*15; i<MAX; i++) {
if(i%5==0 || i%3==0) {
NUMS[j++] = i;
total += i;
}
}
编辑:可以在没有关键部分的情况下执行此操作(来自 ordered
子句)。这 memcpy
是并行的,并且至少对于共享数组也能更好地利用内存。
int *NUMS;
int *prefix;
int total=0, j;
#pragma omp parallel
{
int i;
int nthreads = omp_get_num_threads();
int ithread = omp_get_thread_num();
#pragma omp single
{
prefix = malloc(sizeof *prefix * (nthreads+1));
prefix[0] = 0;
}
int k = 0;
unsigned int NUMS_local[MAX] = {0};
#pragma omp for schedule(static) nowait reduction(+:total)
for(i=0; i<MAX; i++) {
if(i%5==0 || i%3==0) {
NUMS_local[k++] = i;
total += i;
}
}
prefix[ithread+1] = k;
#pragma omp barrier
#pragma omp single
{
for(i=1; i<nthreads+1; i++) prefix[i+1] += prefix[i];
NUMS = malloc(sizeof *NUMS * prefix[nthreads]);
j = prefix[nthreads];
}
memcpy(&NUMS[prefix[ithread]], NUMS_local, sizeof *NUMS *k);
}
free(prefix);
我一直在尝试创建一个多线程程序来计算从 1 到 999 的 3 和 5 的倍数,但我似乎每次都无法正确计算 运行 它我得到了不同的值我认为这可能与我使用具有 10 个线程的共享变量这一事实有关,但我不知道如何解决这个问题。此外,如果我计算从 1 到 9 的 3 和 5 的倍数,该程序确实有效。
#include <stdlib.h>
#include <stdio.h>
#include <omp.h>
#include <string.h>
#define NUM_THREADS 10
#define MAX 1000
//finds multiples of 3 and 5 and sums up all of the multiples
int main(int argc, char ** argv)
{
omp_set_num_threads(10);//set number of threads to be used in the parallel loop
unsigned int NUMS[1000] = { 0 };
int j = 0;
#pragma omp parallel
{
int ID = omp_get_thread_num();//get thread ID
int i;
for(i = ID + 1;i < MAX; i+= NUM_THREADS)
{
if( i % 5 == 0 || i % 3 == 0)
{
NUMS[j++] = i;//Store Multiples of 3 and 5 in an array to sum up later
}
}
}
int i = 0;
unsigned int total;
for(i = 0; NUMS[i] != 0; i++)total += NUMS[i];//add up multiples of 3 and 5
printf("Total : %d\n", total);
return 0;
}
"j++" 不是原子操作。
表示"take the value contained at the storage location called j, use it in the current statement, add one to it, then store it back in the same location it came from".
(这是简单的答案。优化以及是否将值保存在寄存器中可以而且将会改变更多。)
当您有多个线程同时对同一个变量执行此操作时,您会得到不同且不可预测的结果。
您可以使用线程变量来解决这个问题。
您遇到的问题是线程不一定按顺序执行,因此最后一个要传输的线程可能没有按顺序读取值,因此您覆盖了错误的数据。
有一种形式可以设置循环中的线程,当它们完成时使用 openmp 选项进行总结。你必须写这样的东西才能使用它。
#pragma omp parallel for reduction(+:sum)
for(k=0;k<num;k++)
{
sum = sum + A[k]*B[k];
}
/* Fin del computo */
gettimeofday(&fin,NULL);
你所要做的就是将结果写入 "sum",这是我用旧代码做的总结。
你的另一个选择是肮脏的。以某种方式,使用对 OS 的调用让线程等待并按顺序排列。这比看起来容易。这将是一个解决方案。
#pragma omp parallel
for(i = ID + 1;i < MAX; i+= NUM_THREADS)
{
printf("asdasdasdasdasdasdasdas");
if( i % 5 == 0 || i % 3 == 0)
{
NUMS[j++] = i;//Store Multiples of 3 and 5 in an array to sum up later
}
}
但我建议您完整阅读 openmp 选项。
这是一个典型的线程同步问题。为了任何所需操作的原子性(在您的情况下增加变量 j 的值),您需要做的就是使用内核同步对象。根据您使用的操作系统,它可以是 mutex、semaphore 或 event 对象在。但是无论你的开发环境是什么,为了提供原子性,基本的流程逻辑应该像下面的伪代码:
{
lock(kernel_object)
// ...
// do your critical operation (increment your variable j in your case)
// ++j;
// ...
unlock(kernel_object)
}
如果您在 Windows 操作系统上工作,环境会提供一些特殊的同步机制(即:InterlockedIncrement 或 CreateCriticalSection 等)如果你在基于 Unix/Linux 的操作系统上工作,你可以使用 mutex 或 semaphore内核同步对象。实际上所有这些同步机制都源于信号量的概念,信号量是由 Edsger W. Dijkstra 在 1960 年代初发明的。
下面是一些基本示例:
Linux
#include <pthread.h>
pthread_mutex_t g_mutexObject = PTHREAD_MUTEX_INITIALIZER;
int main(int argc, char* argv[])
{
// ...
pthread_mutex_lock(&g_mutexObject);
++j; // incrementing j atomically
pthread_mutex_unlock(&g_mutexObject);
// ...
pthread_mutex_destroy(&g_mutexObject);
// ...
exit(EXIT_SUCCESS);
}
Windows
#include <Windows.h>
CRITICAL_SECTION g_csObject;
int main(void)
{
// ...
InitializeCriticalSection(&g_csObject);
// ...
EnterCriticalSection(&g_csObject);
++j; // incrementing j atomically
LeaveCriticalSection(&g_csObject);
// ...
DeleteCriticalSection(&g_csObject);
// ...
exit(EXIT_SUCCESS);
}
或者只是简单地:
#include <Windows.h>
LONG volatile g_j; // our little j must be volatile in here now
int main(void)
{
// ...
InterlockedIncrement(&g_j); // incrementing j atomically
// ...
exit(EXIT_SUCCESS);
}
在您的代码中 j
是共享的 inductive variable。您不能依赖于对多个线程有效地使用共享归纳变量(每次迭代使用 atomic
效率不高)。
您可以找到一个不使用归纳变量的特殊解决方案(例如使用 wheel factorization,15 个中有七个辐条 {0,3,5,6,9,10,12}
),或者您可以找到一个使用私有归纳变量的通用解决方案,如下所示
#pragma omp parallel
{
int k = 0;
unsigned int NUMS_local[MAX] = {0};
#pragma omp for schedule(static) nowait reduction(+:total)
for(i=0; i<MAX; i++) {
if(i%5==0 || i%3==0) {
NUMS_local[k++] = i;
total += i;
}
}
#pragma omp for schedule(static) ordered
for(i=0; i<omp_get_num_threads(); i++) {
#pragma omp ordered
{
memcpy(&NUMS[j], NUMS_local, sizeof *NUMS *k);
j += k;
}
}
}
但是,此解决方案并未充分利用内存。更好的解决方案是使用 C++ 中的 std::vector
之类的东西,你可以在 C 中使用 realloc
来实现,但我不会为你这样做。
编辑:
这是一个特殊的解决方案,它不使用使用轮因式分解的共享归纳变量
int wheel[] = {0,3,5,6,9,10,12};
int n = MAX/15;
#pragma omp parallel for reduction(+:total)
for(int i=0; i<n; i++) {
for(int k=0; k<7; k++) {
NUMS[7*i + k] = 7*i + wheel[k];
total += NUMS[7*i + k];
}
}
//now clean up for MAX not a multiple of 15
int j = n*7;
for(int i=n*15; i<MAX; i++) {
if(i%5==0 || i%3==0) {
NUMS[j++] = i;
total += i;
}
}
编辑:可以在没有关键部分的情况下执行此操作(来自 ordered
子句)。这 memcpy
是并行的,并且至少对于共享数组也能更好地利用内存。
int *NUMS;
int *prefix;
int total=0, j;
#pragma omp parallel
{
int i;
int nthreads = omp_get_num_threads();
int ithread = omp_get_thread_num();
#pragma omp single
{
prefix = malloc(sizeof *prefix * (nthreads+1));
prefix[0] = 0;
}
int k = 0;
unsigned int NUMS_local[MAX] = {0};
#pragma omp for schedule(static) nowait reduction(+:total)
for(i=0; i<MAX; i++) {
if(i%5==0 || i%3==0) {
NUMS_local[k++] = i;
total += i;
}
}
prefix[ithread+1] = k;
#pragma omp barrier
#pragma omp single
{
for(i=1; i<nthreads+1; i++) prefix[i+1] += prefix[i];
NUMS = malloc(sizeof *NUMS * prefix[nthreads]);
j = prefix[nthreads];
}
memcpy(&NUMS[prefix[ithread]], NUMS_local, sizeof *NUMS *k);
}
free(prefix);