使用全局变量同步 p_threads

Synchronization of p_threads using globals

我是 C 的新手,所以我不确定从哪里开始挖掘我的问题。 我正在尝试将 python number-c运行ching 算法移植到 C,并且由于 C 中没有 GIL(哇哦),我可以从线程更改内存中的任何内容,只要我确保没有比赛。

我做了关于互斥锁的功课,但是,我无法全神贯注地使用互斥锁,以防连续 运行 线程一遍又一遍地访问同一个数组。

我正在使用 p_threads 以便在大阵列 a[N] 上分配工作负载。 数组 a[N] 上的数字 c运行ching 算法是累加的,所以我使用 a_diff[N_THREADS][N] 数组拆分它,将每个线程应用于 a[N] 数组的更改写入 a_diff[N_THREADS][N] 然后在每一步之后将它们合并在一起。

我需要 运行 在不同版本的数组 a[N] 上进行 c运行ching,所以我通过全局指针 p 传递它们(在 MWE 中,有只有一个 a[N])

我正在使用另一个全局数组 SYNC_THREADS[N_THREADS] 同步线程,并通过设置 END_THREADS 全局确保线程在我需要时退出(我知道,我使用了太多全局变量 - 我不在乎,代码大约有 200 行)。我的问题是关于这种同步技术 - 这样做安全吗?cleaner/better/faster 实现它的方法是什么?

MWEe:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N_THREADS 3
#define N 10000000
#define STEPS 3

double a[N];  // main array
double a_diff[N_THREADS][N];  // diffs array
double params[N];  // parameter used for number-crunching
double (*p)[N];  // pointer to array[N]

// structure for bounds for crunching the array
struct bounds {
    int lo;
    int hi;
    int thread_num;
};
struct bounds B[N_THREADS];
int SYNC_THREADS[N_THREADS];  // for syncing threads
int END_THREADS = 0;  // signal to terminate threads


static void *crunching(void *arg) {
    // multiple threads run number-crunching operations according to assigned low/high bounds
    struct bounds *data = (struct bounds *)arg;
    int lo = (*data).lo;
    int hi = (*data).hi;
    int thread_num = (*data).thread_num;
    printf("worker %d started for bounds [%d %d] \n", thread_num, lo, hi);

    int i;

    while (END_THREADS != 1) {  // END_THREADS tells threads to terminate
        if (SYNC_THREADS[thread_num] == 1) {  // SYNC_THREADS allows threads to start number-crunching
            printf("worker %d working... \n", thread_num );
            for (i = lo; i <= hi; ++i) {
                a_diff[thread_num][i] += (*p)[i] * params[i];  // pretend this is an expensive operation...
            }
            SYNC_THREADS[thread_num] = 0;  // thread disables itself until SYNC_THREADS is back to 1
            printf("worker %d stopped... \n", thread_num );
        }
    }
    return 0;
}


int i, j, th,s;
double joiner;

int main() {
    // pre-fill arrays
    for (i = 0; i < N; ++i) {
        a[i] = i + 0.5;
        params[i] = 0.0;
    }

    // split workload between workers
    int worker_length = N / N_THREADS;
    for (i = 0; i < N_THREADS; ++i) {
        B[i].thread_num = i;
        B[i].lo = i * worker_length;
        if (i == N_THREADS - 1) {
            B[i].hi = N;
        } else {
            B[i].hi = i * worker_length + worker_length - 1;
        }
    }
    // pointer to parameters to be passed to worker
    struct bounds **data = malloc(N_THREADS * sizeof(struct bounds*));
    for (i = 0; i < N_THREADS; i++) {
        data[i] = malloc(sizeof(struct bounds));
        data[i]->lo = B[i].lo;
        data[i]->hi = B[i].hi;
        data[i]->thread_num = B[i].thread_num;
    }
    // create thread objects
    pthread_t threads[N_THREADS];

    // disallow threads to crunch numbers
    for (th = 0; th < N_THREADS; ++th) {
        SYNC_THREADS[th] = 0;
    }

    // launch workers
    for(th = 0; th < N_THREADS; th++) {
        pthread_create(&threads[th], NULL, crunching, data[th]);
    }

    // big loop of iterations
    for (s = 0; s < STEPS; ++s) {
        for (i = 0; i < N; ++i) {
            params[i] += 1.0;  // adjust parameters

            // zero diff array
            for (i = 0; i < N; ++i) {
                for (th = 0; th < N_THREADS; ++th) {
                    a_diff[th][i] = 0.0;
                }
            }
            p = &a;  // pointer to array a
            // allow threads to process numbers and wait for threads to complete
            for (th = 0; th < N_THREADS; ++th) { SYNC_THREADS[th] = 1; }
            // ...here threads started by pthread_create do calculations...
            for (th = 0; th < N_THREADS; th++) { while (SYNC_THREADS[th] != 0) {} }

            // join results from threads (number-crunching is additive)
            for (i = 0; i < N; ++i) {
                joiner = 0.0;
                for (th = 0; th < N_THREADS; ++th) {
                    joiner += a_diff[th][i];
                }
                a[i] += joiner;
            }
        }
    }


    // join workers
    END_THREADS = 1;
    for(th = 0; th < N_THREADS; th++) {
        pthread_join(threads[th], NULL);
    }

    return 0;
}

我发现工作人员在时间上没有重叠:

worker 0 started for bounds [0 3333332]
worker 1 started for bounds [3333333 6666665]
worker 2 started for bounds [6666666 10000000]
worker 0 working...
worker 1 working...
worker 2 working...
worker 2 stopped...
worker 0 stopped...
worker 1 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 0 stopped...
worker 2 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 2 stopped...
worker 0 stopped...

Process returned 0 (0x0)   execution time : 1.505 s

并且我通过 a_diff[thead_num][N] 子阵列将它们分开,以确保工作人员不会进入彼此的工作空间,但是,我不确定情况总是如此并且我没有引入隐藏在某处比赛...

我没有意识到问题是什么:-)

所以,问题是您是否考虑好了 SYNC_THREADSEND_THREADS 同步机制。
是的!...差不多。问题是线程在等待时正在燃烧CPU。

条件变量

要使线程等待事件,您需要使用条件变量 (pthread_cond)。它们提供了一些有用的函数,例如 wait()signal()broadcast():

  • wait(&cond, &m) 阻塞给定条件变量中的线程。 [注2]
  • signal(&cond) 解锁在给定条件变量中等待的线程。
  • broadcast(&cond) 解锁在给定条件变量中等待的所有线程。

最初你会让所有线程等待[注 1]:

while(!start_threads)
  pthread_cond_wait(&cond_start);

并且,当主线程准备就绪时:

start_threads = 1;
pthread_cond_broadcast(&cond_start);

障碍

如果迭代之间存在数据依赖性,则需要确保线程在任何给定时刻都在执行相同的迭代。

要在每次迭代结束时同步线程,您需要查看障碍 (pthread_barrier):

  • pthread_barrier_init(count):初始化屏障以同步 count 个线程。
  • pthread_barrier_wait():线程在这里等待,直到所有 count 个线程到达屏障。

扩展障碍的功能

有时你会希望最后一个线程到达障碍来计算一些东西(例如增加迭代次数的计数器,或者计算一些全局值,或者检查执行是否应该停止)。你有两个选择

使用 pthread_barriers

你基本上需要有两个障碍:

int rc = pthread_barrier_wait(&b);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
  if(shouldStop()) stop = 1;
pthread_barrier_wait(&b);
if(stop) return;

使用 pthread_cond 实现我们自己的专业屏障

pthread_mutex_lock(&mutex)
remainingThreads--;
// all threads execute this
executedByAllThreads();
if(remainingThreads == 0) {
  // reinitialize barrier
  remainingThreads = N;
  // only last thread executes this
  if(shouldStop()) stop = 1;
  pthread_cond_broadcast(&cond);
} else {
while(remainingThreads > 0)
  pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);

注 1: 为什么 pthread_cond_wait()while 块内?可能看起来有点奇怪。其背后的原因是由于虚假唤醒的存在。即使没有发出 signal()broadcast(),函数也可能 return。所以,为了保证正确性,通常有一个额外的变量来保证如果一个线程在它应该唤醒之前突然醒来,它会跑回 pthread_cond_wait().

来自手册:

When using condition variables there is always a Boolean predicate involving shared variables associated with each condition wait that is true if the thread should proceed. Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

(...)

If a signal is delivered to a thread waiting for a condition variable, upon return from the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or it shall return zero due to spurious wakeup.

注二:

Michael Burr 在评论中指出,每当您修改谓词 (start_threads) 和 pthread_cond_wait() 时,您都应该持有 companion 锁。 pthread_cond_wait() 调用时会释放互斥锁;并在 returns.

时重新获取它

PS:这里有点晚了;对不起,如果我的文字令人困惑:-)