我如何并行化 C 中的下一个代码,以便它遍历所有行和列? (线性回归程序)
How do I parallelize the next code in C so it iterates through all rows and columns? (linear regression program)
我正在通过线性回归计算程序在 C 中进行并行化分配,但我只是应该在线性计算之前并行化计算所有加法的部分。
原始代码。参数:元素数量
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#define N 50000
int nn;
int *X[N+1],*apX, *Y;
long long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int main(int np, char*p[])
{
int i,j;
double sA,sB;
clock_t ta,t;
assert(np==2);
nn = atoi(p[1]);
assert(nn<=N);
srand(1);
printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024));
apX = calloc(nn*nn,sizeof(int)); assert (apX);
Y = calloc(nn,sizeof(int)); assert (Y);
sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
A = calloc(nn,sizeof(double)); assert (A);
B = calloc(nn,sizeof(double)); assert (B);
// Initialization
X[0] = apX;
/*for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=rand()%100+1;
Y[i]=rand()%100 - 49;
X[i+1] = X[i] + nn;
}*/
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=90;
Y[i]=40;
X[i+1] = X[i] + nn;
}
// add (parallelization part)
sumaY = 0;
for (i=0;i<nn;i++) {
sumaX[i] = sumaX2[i] = sumaXY[i] = 0;
for (j=0;j<nn;j++) {
sumaX[i] += X[i][j];
sumaX2[i] += X[i][j] * X[i][j];
sumaXY[i] += X[i][j] * Y[j];
}
sumaY += Y[i];
}
// linearity calculation
for (i=0;i<nn;i++) {
B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
A[i] = (sumaY -B[i]*sumaX[i])/nn;
}
// check
sA = sB = 0;
for (i=0;i<nn;i++) {
//printf("%lg, %lg\n",A[i],B[i]);
sA += A[i];
sB += B[i];
}
printf("Suma elements de A: %lg B:%lg\n",sA,sB);
exit(0);
}
并行化。参数:元素和线程数
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#define N 50000
#define MAX_THREADS 256
int nn, numThreads;
int *X[N+1],*apX, *Y;
long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int range[MAX_THREADS];
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
int ret;
void * parallel_code(void * id){
int index = (intptr_t) id;
int i, ini, row, col;
int rowAux = -5;
if(index == 0)
ini = 0;
else
ini = range[index-1];
for(i=ini; i<range[index]; i++){
row = i/nn;
col = i%nn;
sumaX[row] += X[row][col];
sumaX2[row] += X[row][col] * X[row][col];
sumaXY[row] += X[row][col] * Y[col];
pthread_mutex_lock(&mutex);
if(rowAux != row){
sumaY += Y[row];
rowAux = row;
pthread_mutex_unlock(&mutex);
}else{
pthread_mutex_unlock(&mutex);
}
}
pthread_exit(0);
}
int main(int np, char*p[])
{
int i,j,index;
double sA,sB;
clock_t ta,t;
pthread_t threads[MAX_THREADS];
assert(np==3);
nn = atoi(p[1]);
assert(nn<=N);
srand(1);
numThreads = atoi(p[2]);
assert(numThreads >= 2 && numThreads <= MAX_THREADS);
printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024));
memset(range,0,numThreads*sizeof(int));
apX = calloc(nn*nn,sizeof(int)); assert (apX);
Y = calloc(nn,sizeof(int)); assert (Y);
sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
A = calloc(nn,sizeof(double)); assert (A);
B = calloc(nn,sizeof(double)); assert (B);
// Inicialitzacio
/*X[0] = apX;
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=rand()%100+1;
Y[i]=rand()%100 - 49;
X[i+1] = X[i] + nn;
}*/
X[0] = apX;
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=90;
Y[i]=40;
X[i+1] = X[i] + nn;
}
int portion = nn*nn/numThreads;
int mod = nn*nn % numThreads;
if(mod != 0.00){
mod = mod*numThreads;
for(i=0; i<mod; i++){
range[i] = range[i] + 1;
}
}
range[0] = range[0] + portion;
for(i=1; i<numThreads; i++){
range[i] += range[i-1] + portion;
}
sumaY = 0;
pthread_mutex_init(&mutex, NULL);
for (index = 0; index < numThreads; index++)
{
assert(!pthread_create(&threads[index], NULL, parallel_code, (void *) (intptr_t)index));
}
for(index = 0; index < numThreads; index++)
{
assert(!pthread_join(threads[index], NULL ));
}
pthread_mutex_destroy(&mutex);
for (i=0;i<nn;i++) {
B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
A[i] = (sumaY -B[i]*sumaX[i])/nn;
}
// check
sA = 0;
sB = 0;
for (i=0;i<nn;i++) {
//printf("%f, %f\n",sA,sB);
sA += A[i];
sB += B[i];
}
printf("Suma elements de A: %lg B:%lg\n",sA,sB);
exit(0);
}
到目前为止,我已经完成了一些并行化,您可以在上面的代码中看到:计算每个线程必须处理的数据量(这就是变量“部分”和“mod”的用途) ),为每个部分创建了线程,创建了一个互斥量来控制 SumaY 访问...问题是当前程序只使用小值(例如 2000 作为元素数量)并且不知道为什么。可能是因为线程没有读取所有必需的列 and/or 行,因为每次程序显示错误值时,该值总是低于正确值,因此可能表明程序缺少一些要读取的数据。公平地说,我认为我非常接近正确的解决方案,所以我作为最后的资源来到这里。此外,考虑到将任务拆分为每个线程的多个部分是分配的必要条件。
非常感谢。
27/01/2021 编辑(工作代码,快速和慢速版本):
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#define N 50000
#define MAX_THREADS 256
int nn, numThreads;
int *X[N+1],*apX, *Y;
long long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int range[MAX_THREADS];
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
// Slow version
//int visitedRows[N];
void * parallel_code(void * args){
int index = (*(int*)args);
int i, j, ini;
// Slow version
// int row, col;
if(index == 0)
ini = 0;
else
ini = range[index-1];
for(i=ini; i<range[index]; i++){
// Fast version
for (j=0;j<nn;j++) {
sumaX[i] += X[i][j];
sumaX2[i] += X[i][j] * X[i][j];
sumaXY[i] += X[i][j] * Y[j];
}
pthread_mutex_lock(&mutex);
sumaY += Y[i];
pthread_mutex_unlock(&mutex);
// Slow version
/*row = i/nn;
col = i%nn;
sumaX[row] += X[row][col];
sumaX2[row] += X[row][col] * X[row][col];
sumaXY[row] += X[row][col] * Y[col];
pthread_mutex_lock(&mutex);
if(visitedRows[row] == 0){
visitedRows[row] = 1;
sumaY += Y[row];
pthread_mutex_unlock(&mutex);
}else{
pthread_mutex_unlock(&mutex);
}*/
}
pthread_exit(0);
}
int main(int np, char*p[])
{
int i,j,index;
double sA,sB;
unsigned int thread_args[MAX_THREADS];
pthread_t threads[MAX_THREADS];
assert(np==3);
nn = atoi(p[1]);
assert(nn<=N);
srand(1);
numThreads = atoi(p[2]);
assert(numThreads >= 2 && numThreads <= MAX_THREADS);
printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024));
memset(range,0,numThreads*sizeof(int));
// Slow version
//memset(visitedRows,0,nn*sizeof(int));
apX = calloc(nn*nn,sizeof(int)); assert (apX);
Y = calloc(nn,sizeof(int)); assert (Y);
sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
A = calloc(nn,sizeof(double)); assert (A);
B = calloc(nn,sizeof(double)); assert (B);
// Inicialitzacio
X[0] = apX;
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=rand()%100+1;
Y[i]=rand()%100 - 49;
X[i+1] = X[i] + nn;
}
// Fast version
int portion = nn/numThreads;
int mod = nn % numThreads;
// Slow version
//int portion = nn*nn/numThreads;
//int mod = nn*nn % numThreads;
for(i=0; i<numThreads; i++){
range[i] = portion;
if (i != 0) range[i] += range[i-1];
if (i < mod) range[i]++;
}
sumaY = 0;
pthread_mutex_init(&mutex, NULL);
for (index = 0; index < numThreads; index++)
{
thread_args[index] = index;
assert(!pthread_create(&threads[index], NULL, parallel_code, &thread_args[index]));
}
for(index = 0; index < numThreads; index++)
{
assert(!pthread_join(threads[index], NULL ));
}
pthread_mutex_destroy(&mutex);
for (i=0;i<nn;i++) {
B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
A[i] = (sumaY -B[i]*sumaX[i])/nn;
}
// check
sA = sB = 0;
for (i=0;i<nn;i++) {
//printf("%f, %f\n",sA,sB);
sA += A[i];
sB += B[i];
}
printf("Suma elements de A: %lg B:%lg\n",sA,sB);
exit(0);
}
互斥体初始化和销毁
您已经通过静态初始化程序初始化了 mutex
,通过 pthread_mutex_init()
再次初始化它是错误的(除非您先使用 pthread_mutex_destroy()
将其拆除)。
此外,在加入所有线程后拆除互斥锁是没有必要的,尽管没有错。
工作分配,第 1 部分
这是错误的:
int mod = nn*nn % numThreads;
if(mod != 0.00){
mod = mod*numThreads;
for(i=0; i<mod; i++){
range[i] = range[i] + 1;
}
}
我认为您正试图在线程之间分配多余的 nn * nn
数据,但只有 mod
个多余的元素,而不是 mod * NumThreads
个。我想你的意思是
int mod = nn*nn % numThreads;
// No need to pre-test whether mod is nonzero.
// mod is used as originally computed, not multiplied by numThreads.
for (i = 0; i < mod; i++) {
range[i] = range[i] + 1;
}
原始版本不仅设置范围不正确,而且当 nn*nn % numThreads
大于 1 时,运行 还会超出数组 range
的边界。
但可能这一切都没有实际意义。见下文。
工作分配,第 2 部分
我怀疑主要问题是这些行...
sumaX[row] += X[row][col];
sumaX2[row] += X[row][col] * X[row][col];
sumaXY[row] += X[row][col] * Y[col];
... 由线程函数执行而不锁定互斥锁。 sumX
、sumX2
和sumXY
指向线程之间共享的数据,并且由于工作已经在它们之间进行了拆分,所以完全有可能多个线程贡献相同的数据元素。在那种情况下,您会发生数据竞争,并且结果行为是不确定的。
天真地,您可以通过将这些计算移动到临界区内,在 pthread_mutex_lock()
之后来解决该问题,因为现在您确实在循环的每次迭代中锁定和解锁互斥量。但这有几个问题,尤其是:
- 你会挤出大部分已经受限的线程并发机会;和
- 互斥锁操作相对昂贵,否则每次循环迭代只有少数算术运算,那么频繁地锁定和解锁互斥锁可能会影响性能。
如果您采用这种方式并行版本不比串行版本慢,我会感到惊讶。
你应该做的是将实际使用的线程数限制为最多数据行数,并以整行为基础将数据分配给线程。不应将任何行拆分为两个或多个线程。这将在不破坏并行化目的的情况下消除上述数据竞争。
我还会修改线程函数,以便它仅在将每行结果添加到全局总和时才锁定互斥锁。这将使您获得比现在更多的并发性。
这将使您在线程之间的数据分配不那么均匀,并且总体上可能会减少线程数,但是无论如何,拥有比 运行 执行单元更多的线程对您没有帮助。当行数相对于线程数较大时,不均匀的影响不会很明显,而在小情况下,总体 运行 时间首先并不是什么大问题.更重要的是,计算应该产生正确的结果,并且锁定的减少应该会显着提高性能。
我正在通过线性回归计算程序在 C 中进行并行化分配,但我只是应该在线性计算之前并行化计算所有加法的部分。
原始代码。参数:元素数量
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#define N 50000
int nn;
int *X[N+1],*apX, *Y;
long long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int main(int np, char*p[])
{
int i,j;
double sA,sB;
clock_t ta,t;
assert(np==2);
nn = atoi(p[1]);
assert(nn<=N);
srand(1);
printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024));
apX = calloc(nn*nn,sizeof(int)); assert (apX);
Y = calloc(nn,sizeof(int)); assert (Y);
sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
A = calloc(nn,sizeof(double)); assert (A);
B = calloc(nn,sizeof(double)); assert (B);
// Initialization
X[0] = apX;
/*for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=rand()%100+1;
Y[i]=rand()%100 - 49;
X[i+1] = X[i] + nn;
}*/
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=90;
Y[i]=40;
X[i+1] = X[i] + nn;
}
// add (parallelization part)
sumaY = 0;
for (i=0;i<nn;i++) {
sumaX[i] = sumaX2[i] = sumaXY[i] = 0;
for (j=0;j<nn;j++) {
sumaX[i] += X[i][j];
sumaX2[i] += X[i][j] * X[i][j];
sumaXY[i] += X[i][j] * Y[j];
}
sumaY += Y[i];
}
// linearity calculation
for (i=0;i<nn;i++) {
B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
A[i] = (sumaY -B[i]*sumaX[i])/nn;
}
// check
sA = sB = 0;
for (i=0;i<nn;i++) {
//printf("%lg, %lg\n",A[i],B[i]);
sA += A[i];
sB += B[i];
}
printf("Suma elements de A: %lg B:%lg\n",sA,sB);
exit(0);
}
并行化。参数:元素和线程数
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#define N 50000
#define MAX_THREADS 256
int nn, numThreads;
int *X[N+1],*apX, *Y;
long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int range[MAX_THREADS];
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
int ret;
void * parallel_code(void * id){
int index = (intptr_t) id;
int i, ini, row, col;
int rowAux = -5;
if(index == 0)
ini = 0;
else
ini = range[index-1];
for(i=ini; i<range[index]; i++){
row = i/nn;
col = i%nn;
sumaX[row] += X[row][col];
sumaX2[row] += X[row][col] * X[row][col];
sumaXY[row] += X[row][col] * Y[col];
pthread_mutex_lock(&mutex);
if(rowAux != row){
sumaY += Y[row];
rowAux = row;
pthread_mutex_unlock(&mutex);
}else{
pthread_mutex_unlock(&mutex);
}
}
pthread_exit(0);
}
int main(int np, char*p[])
{
int i,j,index;
double sA,sB;
clock_t ta,t;
pthread_t threads[MAX_THREADS];
assert(np==3);
nn = atoi(p[1]);
assert(nn<=N);
srand(1);
numThreads = atoi(p[2]);
assert(numThreads >= 2 && numThreads <= MAX_THREADS);
printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024));
memset(range,0,numThreads*sizeof(int));
apX = calloc(nn*nn,sizeof(int)); assert (apX);
Y = calloc(nn,sizeof(int)); assert (Y);
sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
A = calloc(nn,sizeof(double)); assert (A);
B = calloc(nn,sizeof(double)); assert (B);
// Inicialitzacio
/*X[0] = apX;
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=rand()%100+1;
Y[i]=rand()%100 - 49;
X[i+1] = X[i] + nn;
}*/
X[0] = apX;
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=90;
Y[i]=40;
X[i+1] = X[i] + nn;
}
int portion = nn*nn/numThreads;
int mod = nn*nn % numThreads;
if(mod != 0.00){
mod = mod*numThreads;
for(i=0; i<mod; i++){
range[i] = range[i] + 1;
}
}
range[0] = range[0] + portion;
for(i=1; i<numThreads; i++){
range[i] += range[i-1] + portion;
}
sumaY = 0;
pthread_mutex_init(&mutex, NULL);
for (index = 0; index < numThreads; index++)
{
assert(!pthread_create(&threads[index], NULL, parallel_code, (void *) (intptr_t)index));
}
for(index = 0; index < numThreads; index++)
{
assert(!pthread_join(threads[index], NULL ));
}
pthread_mutex_destroy(&mutex);
for (i=0;i<nn;i++) {
B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
A[i] = (sumaY -B[i]*sumaX[i])/nn;
}
// check
sA = 0;
sB = 0;
for (i=0;i<nn;i++) {
//printf("%f, %f\n",sA,sB);
sA += A[i];
sB += B[i];
}
printf("Suma elements de A: %lg B:%lg\n",sA,sB);
exit(0);
}
到目前为止,我已经完成了一些并行化,您可以在上面的代码中看到:计算每个线程必须处理的数据量(这就是变量“部分”和“mod”的用途) ),为每个部分创建了线程,创建了一个互斥量来控制 SumaY 访问...问题是当前程序只使用小值(例如 2000 作为元素数量)并且不知道为什么。可能是因为线程没有读取所有必需的列 and/or 行,因为每次程序显示错误值时,该值总是低于正确值,因此可能表明程序缺少一些要读取的数据。公平地说,我认为我非常接近正确的解决方案,所以我作为最后的资源来到这里。此外,考虑到将任务拆分为每个线程的多个部分是分配的必要条件。
非常感谢。
27/01/2021 编辑(工作代码,快速和慢速版本):
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#define N 50000
#define MAX_THREADS 256
int nn, numThreads;
int *X[N+1],*apX, *Y;
long long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int range[MAX_THREADS];
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
// Slow version
//int visitedRows[N];
void * parallel_code(void * args){
int index = (*(int*)args);
int i, j, ini;
// Slow version
// int row, col;
if(index == 0)
ini = 0;
else
ini = range[index-1];
for(i=ini; i<range[index]; i++){
// Fast version
for (j=0;j<nn;j++) {
sumaX[i] += X[i][j];
sumaX2[i] += X[i][j] * X[i][j];
sumaXY[i] += X[i][j] * Y[j];
}
pthread_mutex_lock(&mutex);
sumaY += Y[i];
pthread_mutex_unlock(&mutex);
// Slow version
/*row = i/nn;
col = i%nn;
sumaX[row] += X[row][col];
sumaX2[row] += X[row][col] * X[row][col];
sumaXY[row] += X[row][col] * Y[col];
pthread_mutex_lock(&mutex);
if(visitedRows[row] == 0){
visitedRows[row] = 1;
sumaY += Y[row];
pthread_mutex_unlock(&mutex);
}else{
pthread_mutex_unlock(&mutex);
}*/
}
pthread_exit(0);
}
int main(int np, char*p[])
{
int i,j,index;
double sA,sB;
unsigned int thread_args[MAX_THREADS];
pthread_t threads[MAX_THREADS];
assert(np==3);
nn = atoi(p[1]);
assert(nn<=N);
srand(1);
numThreads = atoi(p[2]);
assert(numThreads >= 2 && numThreads <= MAX_THREADS);
printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024));
memset(range,0,numThreads*sizeof(int));
// Slow version
//memset(visitedRows,0,nn*sizeof(int));
apX = calloc(nn*nn,sizeof(int)); assert (apX);
Y = calloc(nn,sizeof(int)); assert (Y);
sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
A = calloc(nn,sizeof(double)); assert (A);
B = calloc(nn,sizeof(double)); assert (B);
// Inicialitzacio
X[0] = apX;
for (i=0;i<nn;i++) {
for (j=0;j<nn;j+=8)
X[i][j]=rand()%100+1;
Y[i]=rand()%100 - 49;
X[i+1] = X[i] + nn;
}
// Fast version
int portion = nn/numThreads;
int mod = nn % numThreads;
// Slow version
//int portion = nn*nn/numThreads;
//int mod = nn*nn % numThreads;
for(i=0; i<numThreads; i++){
range[i] = portion;
if (i != 0) range[i] += range[i-1];
if (i < mod) range[i]++;
}
sumaY = 0;
pthread_mutex_init(&mutex, NULL);
for (index = 0; index < numThreads; index++)
{
thread_args[index] = index;
assert(!pthread_create(&threads[index], NULL, parallel_code, &thread_args[index]));
}
for(index = 0; index < numThreads; index++)
{
assert(!pthread_join(threads[index], NULL ));
}
pthread_mutex_destroy(&mutex);
for (i=0;i<nn;i++) {
B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
A[i] = (sumaY -B[i]*sumaX[i])/nn;
}
// check
sA = sB = 0;
for (i=0;i<nn;i++) {
//printf("%f, %f\n",sA,sB);
sA += A[i];
sB += B[i];
}
printf("Suma elements de A: %lg B:%lg\n",sA,sB);
exit(0);
}
互斥体初始化和销毁
您已经通过静态初始化程序初始化了 mutex
,通过 pthread_mutex_init()
再次初始化它是错误的(除非您先使用 pthread_mutex_destroy()
将其拆除)。
此外,在加入所有线程后拆除互斥锁是没有必要的,尽管没有错。
工作分配,第 1 部分
这是错误的:
int mod = nn*nn % numThreads; if(mod != 0.00){ mod = mod*numThreads; for(i=0; i<mod; i++){ range[i] = range[i] + 1; } }
我认为您正试图在线程之间分配多余的 nn * nn
数据,但只有 mod
个多余的元素,而不是 mod * NumThreads
个。我想你的意思是
int mod = nn*nn % numThreads;
// No need to pre-test whether mod is nonzero.
// mod is used as originally computed, not multiplied by numThreads.
for (i = 0; i < mod; i++) {
range[i] = range[i] + 1;
}
原始版本不仅设置范围不正确,而且当 nn*nn % numThreads
大于 1 时,运行 还会超出数组 range
的边界。
但可能这一切都没有实际意义。见下文。
工作分配,第 2 部分
我怀疑主要问题是这些行...
sumaX[row] += X[row][col]; sumaX2[row] += X[row][col] * X[row][col]; sumaXY[row] += X[row][col] * Y[col];
... 由线程函数执行而不锁定互斥锁。 sumX
、sumX2
和sumXY
指向线程之间共享的数据,并且由于工作已经在它们之间进行了拆分,所以完全有可能多个线程贡献相同的数据元素。在那种情况下,您会发生数据竞争,并且结果行为是不确定的。
天真地,您可以通过将这些计算移动到临界区内,在 pthread_mutex_lock()
之后来解决该问题,因为现在您确实在循环的每次迭代中锁定和解锁互斥量。但这有几个问题,尤其是:
- 你会挤出大部分已经受限的线程并发机会;和
- 互斥锁操作相对昂贵,否则每次循环迭代只有少数算术运算,那么频繁地锁定和解锁互斥锁可能会影响性能。
如果您采用这种方式并行版本不比串行版本慢,我会感到惊讶。
你应该做的是将实际使用的线程数限制为最多数据行数,并以整行为基础将数据分配给线程。不应将任何行拆分为两个或多个线程。这将在不破坏并行化目的的情况下消除上述数据竞争。
我还会修改线程函数,以便它仅在将每行结果添加到全局总和时才锁定互斥锁。这将使您获得比现在更多的并发性。
这将使您在线程之间的数据分配不那么均匀,并且总体上可能会减少线程数,但是无论如何,拥有比 运行 执行单元更多的线程对您没有帮助。当行数相对于线程数较大时,不均匀的影响不会很明显,而在小情况下,总体 运行 时间首先并不是什么大问题.更重要的是,计算应该产生正确的结果,并且锁定的减少应该会显着提高性能。