我如何并行化 C 中的下一个代码,以便它遍历所有行和列? (线性回归程序)

How do I parallelize the next code in C so it iterates through all rows and columns? (linear regression program)

我正在通过线性回归计算程序在 C 中进行并行化分配,但我只是应该在线性计算之前并行化计算所有加法的部分。

原始代码。参数:元素数量

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>

#define N 50000

int nn;
int *X[N+1],*apX, *Y;
long long *sumaX, *sumaX2, sumaY, *sumaXY; 
double *A, *B;

int main(int np, char*p[])
{
    int i,j;
    double sA,sB;
    clock_t ta,t;

    assert(np==2);

    nn = atoi(p[1]);
    assert(nn<=N);
    srand(1);

    printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024)); 

    apX = calloc(nn*nn,sizeof(int)); assert (apX);
    Y = calloc(nn,sizeof(int)); assert (Y);
    sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
    sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
    sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
    A = calloc(nn,sizeof(double)); assert (A);
    B = calloc(nn,sizeof(double)); assert (B);

    // Initialization
    X[0] = apX;
    /*for (i=0;i<nn;i++) {
        for (j=0;j<nn;j+=8)            
            X[i][j]=rand()%100+1;
        Y[i]=rand()%100 - 49;
    X[i+1] = X[i] + nn;
    }*/
    for (i=0;i<nn;i++) {
        for (j=0;j<nn;j+=8)            
            X[i][j]=90;
        Y[i]=40;
    X[i+1] = X[i] + nn;
    }

    // add (parallelization part)
    sumaY = 0;
    for (i=0;i<nn;i++) {
    sumaX[i] = sumaX2[i] = sumaXY[i] = 0;
        for (j=0;j<nn;j++) {
        sumaX[i] += X[i][j];
        sumaX2[i] += X[i][j] * X[i][j];
        sumaXY[i] += X[i][j] * Y[j];
    }
    sumaY += Y[i];
    }

    // linearity calculation
    for (i=0;i<nn;i++) {
    B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
    B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
    A[i] = (sumaY -B[i]*sumaX[i])/nn;
    }

    // check
    sA = sB = 0;
    for (i=0;i<nn;i++) {
            //printf("%lg, %lg\n",A[i],B[i]);
        sA += A[i];
        sB += B[i];
    }

    printf("Suma elements de A: %lg B:%lg\n",sA,sB);

exit(0);
}

并行化。参数:元素和线程数

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>

#define N 50000
#define MAX_THREADS 256

int nn, numThreads;
int *X[N+1],*apX, *Y;
long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int range[MAX_THREADS];
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
int ret;

void * parallel_code(void * id){
    int index = (intptr_t) id;
    int i, ini, row, col;
    int rowAux = -5;

    if(index == 0)
        ini = 0;
    else
        ini = range[index-1];

    for(i=ini; i<range[index]; i++){
        row = i/nn;
        col = i%nn;

        sumaX[row] += X[row][col];
        sumaX2[row] += X[row][col] * X[row][col];
        sumaXY[row] += X[row][col] * Y[col];

        pthread_mutex_lock(&mutex);
        if(rowAux != row){
            sumaY += Y[row];
            rowAux = row;
            pthread_mutex_unlock(&mutex);
        }else{
            pthread_mutex_unlock(&mutex);
        }
    }
    pthread_exit(0);
}

int main(int np, char*p[])
{
    int i,j,index;
    double sA,sB;
    clock_t ta,t;
    pthread_t threads[MAX_THREADS];

    assert(np==3);

    nn = atoi(p[1]);
    assert(nn<=N);
    srand(1);

    numThreads = atoi(p[2]);
    assert(numThreads >= 2 && numThreads <= MAX_THREADS);

    printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024)); 

    memset(range,0,numThreads*sizeof(int));

    apX = calloc(nn*nn,sizeof(int)); assert (apX);
    Y = calloc(nn,sizeof(int)); assert (Y);
    sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
    sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
    sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
    A = calloc(nn,sizeof(double)); assert (A);
    B = calloc(nn,sizeof(double)); assert (B);

    // Inicialitzacio
    /*X[0] = apX;
    for (i=0;i<nn;i++) {
        for (j=0;j<nn;j+=8)            
            X[i][j]=rand()%100+1;
        Y[i]=rand()%100 - 49;
    X[i+1] = X[i] + nn;
    }*/
    X[0] = apX;
    for (i=0;i<nn;i++) {
        for (j=0;j<nn;j+=8)            
            X[i][j]=90;
        Y[i]=40;
    X[i+1] = X[i] + nn;
    }

    int portion = nn*nn/numThreads;
    int mod = nn*nn % numThreads;

    if(mod != 0.00){
        mod = mod*numThreads;
        for(i=0; i<mod; i++){
            range[i] = range[i] + 1;
        }
    }

    range[0] = range[0] + portion;
    for(i=1; i<numThreads; i++){
        range[i] += range[i-1] + portion;
    }

    sumaY = 0;
    pthread_mutex_init(&mutex, NULL);
    for (index = 0; index < numThreads; index++)
    {
        assert(!pthread_create(&threads[index], NULL, parallel_code, (void *) (intptr_t)index));
    }

    for(index = 0; index < numThreads; index++)
    {
        assert(!pthread_join(threads[index], NULL ));
    }
    pthread_mutex_destroy(&mutex);

    for (i=0;i<nn;i++) {
    B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
    B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
    A[i] = (sumaY -B[i]*sumaX[i])/nn;
    }

    // check
    sA = 0;
    sB = 0;
    for (i=0;i<nn;i++) {
            //printf("%f, %f\n",sA,sB);
        sA += A[i];
        sB += B[i];
    }
    printf("Suma elements de A: %lg B:%lg\n",sA,sB);

exit(0);
}

到目前为止,我已经完成了一些并行化,您可以在上面的代码中看到:计算每个线程必须处理的数据量(这就是变量“部分”和“mod”的用途) ),为每个部分创建了线程,创建了一个互斥量来控制 SumaY 访问...问题是当前程序只使用小值(例如 2000 作为元素数量)并且不知道为什么。可能是因为线程没有读取所有必需的列 and/or 行,因为每次程序显示错误值时,该值总是低于正确值,因此可能表明程序缺少一些要读取的数据。公平地说,我认为我非常接近正确的解决方案,所以我作为最后的资源来到这里。此外,考虑到将任务拆分为每个线程的多个部分是分配的必要条件。

非常感谢。

27/01/2021 编辑(工作代码,快速和慢速版本):

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>

#define N 50000
#define MAX_THREADS 256

int nn, numThreads;
int *X[N+1],*apX, *Y;
long long *sumaX, *sumaX2, sumaY, *sumaXY;
double *A, *B;
int range[MAX_THREADS];
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
// Slow version
//int visitedRows[N];

void * parallel_code(void * args){
    int index = (*(int*)args);
    int i, j, ini;
    // Slow version
    // int row, col;

    if(index == 0)
        ini = 0;
    else
        ini = range[index-1];
    
    for(i=ini; i<range[index]; i++){
        // Fast version
        for (j=0;j<nn;j++) {
            sumaX[i] += X[i][j];
            sumaX2[i] += X[i][j] * X[i][j];
            sumaXY[i] += X[i][j] * Y[j];
        }
        pthread_mutex_lock(&mutex);
        sumaY += Y[i];
        pthread_mutex_unlock(&mutex);

        // Slow version
        /*row = i/nn;
        col = i%nn;
        sumaX[row] += X[row][col];
        sumaX2[row] += X[row][col] * X[row][col];
        sumaXY[row] += X[row][col] * Y[col];
        
        pthread_mutex_lock(&mutex);
        if(visitedRows[row] == 0){
            visitedRows[row] = 1;
            sumaY += Y[row];
            pthread_mutex_unlock(&mutex);
        }else{
            pthread_mutex_unlock(&mutex);
        }*/
    }
    pthread_exit(0);
}

int main(int np, char*p[])
{
    int i,j,index;
    double sA,sB;
    unsigned int thread_args[MAX_THREADS];
    pthread_t threads[MAX_THREADS];

    assert(np==3);

    nn = atoi(p[1]);
    assert(nn<=N);
    srand(1);

    numThreads = atoi(p[2]);
    assert(numThreads >= 2 && numThreads <= MAX_THREADS);

    printf("Dimensio dades =~ %g Mbytes\n",((double)(nn*(nn+11))*4)/(1024*1024)); 

    memset(range,0,numThreads*sizeof(int));
    // Slow version
    //memset(visitedRows,0,nn*sizeof(int));

    apX = calloc(nn*nn,sizeof(int)); assert (apX);
    Y = calloc(nn,sizeof(int)); assert (Y);
    sumaX = calloc(nn,sizeof(long long)); assert (sumaX);
    sumaX2 = calloc(nn,sizeof(long long)); assert (sumaX2);
    sumaXY = calloc(nn,sizeof(long long)); assert (sumaXY);
    A = calloc(nn,sizeof(double)); assert (A);
    B = calloc(nn,sizeof(double)); assert (B);
    // Inicialitzacio
    X[0] = apX;
    for (i=0;i<nn;i++) {
        for (j=0;j<nn;j+=8)            
            X[i][j]=rand()%100+1;
        Y[i]=rand()%100 - 49;
    X[i+1] = X[i] + nn;
    }

    // Fast version
    int portion = nn/numThreads;
    int mod = nn % numThreads;

    // Slow version
    //int portion = nn*nn/numThreads;
    //int mod = nn*nn % numThreads;
    
    for(i=0; i<numThreads; i++){
        range[i] = portion;
        if (i != 0) range[i] += range[i-1];
        if (i < mod) range[i]++;
    }
    
    sumaY = 0;
    pthread_mutex_init(&mutex, NULL);
    for (index = 0; index < numThreads; index++)
    {
        thread_args[index] = index;
        assert(!pthread_create(&threads[index], NULL, parallel_code, &thread_args[index]));
    }

    for(index = 0; index < numThreads; index++)
    {
        assert(!pthread_join(threads[index], NULL ));
    }
    pthread_mutex_destroy(&mutex);

    for (i=0;i<nn;i++) {
    B[i] = sumaXY[i] - (sumaX[i] * sumaY)/nn;
    B[i] = B[i] / (sumaX2[i] - (sumaX[i] * sumaX[i])/nn);
    A[i] = (sumaY -B[i]*sumaX[i])/nn;
    }

    // check
    sA = sB = 0;
    for (i=0;i<nn;i++) {
            //printf("%f, %f\n",sA,sB);
        sA += A[i];
        sB += B[i];
    }
    printf("Suma elements de A: %lg B:%lg\n",sA,sB);

exit(0);
}

互斥体初始化和销毁​​

您已经通过静态初始化程序初始化了 mutex,通过 pthread_mutex_init() 再次初始化它是错误的(除非您先使用 pthread_mutex_destroy() 将其拆除)。

此外,在加入所有线程后拆除互斥锁是没有必要的,尽管没有错。

工作分配,第 1 部分

这是错误的:

    int mod = nn*nn % numThreads;

    if(mod != 0.00){
        mod = mod*numThreads;
        for(i=0; i<mod; i++){
            range[i] = range[i] + 1;
        }
    }

我认为您正试图在线程之间分配多余的 nn * nn 数据,但只有 mod 个多余的元素,而不是 mod * NumThreads 个。我想你的意思是

    int mod = nn*nn % numThreads;

    // No need to pre-test whether mod is nonzero.
    // mod is used as originally computed, not multiplied by numThreads.
    for (i = 0; i < mod; i++) {
        range[i] = range[i] + 1;
    }

原始版本不仅设置范围不正确,而且当 nn*nn % numThreads 大于 1 时,运行 还会超出数组 range 的边界。

但可能这一切都没有实际意义。见下文。

工作分配,第 2 部分

我怀疑主要问题是这些行...

        sumaX[row] += X[row][col];
        sumaX2[row] += X[row][col] * X[row][col];
        sumaXY[row] += X[row][col] * Y[col];

... 由线程函数执行而不锁定互斥锁。 sumXsumX2sumXY指向线程之间共享的数据,并且由于工作已经在它们之间进行了拆分,所以完全有可能多个线程贡献相同的数据元素。在那种情况下,您会发生数据竞争,并且结果行为是不确定的。

天真地,您可以通过将这些计算移动到临界区内,在 pthread_mutex_lock() 之后来解决该问题,因为现在您确实在循环的每次迭代中锁定和解锁互斥量。但这有几个问题,尤其是:

  • 你会挤出大部分已经受限的线程并发机会;和
  • 互斥锁操作相对昂贵,否则每次循环迭代只有少数算术运算,那么频繁地锁定和解锁互斥锁可能会影响性能。

如果您采用这种方式并行版本不比串行版本慢,我会感到惊讶。

你应该做的是将实际使用的线程数限制为最多数据行数,并以整行为基础将数据分配给线程。不应将任何行拆分为两个或多个线程。这将在不破坏并行化目的的情况下消除上述数据竞争。

我还会修改线程函数,以便它仅在将每行结果添加到全局总和时才锁定互斥锁。这将使您获得比现在更多的并发性。

这将使您在线程之间的数据分配不那么均匀,并且总体上可能会减少线程数,但是无论如何,拥有比 运行 执行单元更多的线程对您没有帮助。当行数相对于线程数较大时,不均匀的影响不会很明显,而在小情况下,总体 运行 时间首先并不是什么大问题.更重要的是,计算应该产生正确的结果,并且锁定的减少应该会显着提高性能。