使用全局变量同步 p_threads
Synchronization of p_threads using globals
我是 C 的新手,所以我不确定从哪里开始挖掘我的问题。
我正在尝试将 python number-c运行ching 算法移植到 C,并且由于 C 中没有 GIL(哇哦),我可以从线程更改内存中的任何内容,只要我确保没有比赛。
我做了关于互斥锁的功课,但是,我无法全神贯注地使用互斥锁,以防连续 运行 线程一遍又一遍地访问同一个数组。
我正在使用 p_threads 以便在大阵列 a[N]
上分配工作负载。
数组 a[N]
上的数字 c运行ching 算法是累加的,所以我使用 a_diff[N_THREADS][N]
数组拆分它,将每个线程应用于 a[N]
数组的更改写入 a_diff[N_THREADS][N]
然后在每一步之后将它们合并在一起。
我需要 运行 在不同版本的数组 a[N]
上进行 c运行ching,所以我通过全局指针 p
传递它们(在 MWE 中,有只有一个 a[N]
)
我正在使用另一个全局数组 SYNC_THREADS[N_THREADS]
同步线程,并通过设置 END_THREADS
全局确保线程在我需要时退出(我知道,我使用了太多全局变量 - 我不在乎,代码大约有 200 行)。我的问题是关于这种同步技术 - 这样做安全吗?cleaner/better/faster 实现它的方法是什么?
MWEe:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N_THREADS 3
#define N 10000000
#define STEPS 3
double a[N]; // main array
double a_diff[N_THREADS][N]; // diffs array
double params[N]; // parameter used for number-crunching
double (*p)[N]; // pointer to array[N]
// structure for bounds for crunching the array
struct bounds {
int lo;
int hi;
int thread_num;
};
struct bounds B[N_THREADS];
int SYNC_THREADS[N_THREADS]; // for syncing threads
int END_THREADS = 0; // signal to terminate threads
static void *crunching(void *arg) {
// multiple threads run number-crunching operations according to assigned low/high bounds
struct bounds *data = (struct bounds *)arg;
int lo = (*data).lo;
int hi = (*data).hi;
int thread_num = (*data).thread_num;
printf("worker %d started for bounds [%d %d] \n", thread_num, lo, hi);
int i;
while (END_THREADS != 1) { // END_THREADS tells threads to terminate
if (SYNC_THREADS[thread_num] == 1) { // SYNC_THREADS allows threads to start number-crunching
printf("worker %d working... \n", thread_num );
for (i = lo; i <= hi; ++i) {
a_diff[thread_num][i] += (*p)[i] * params[i]; // pretend this is an expensive operation...
}
SYNC_THREADS[thread_num] = 0; // thread disables itself until SYNC_THREADS is back to 1
printf("worker %d stopped... \n", thread_num );
}
}
return 0;
}
int i, j, th,s;
double joiner;
int main() {
// pre-fill arrays
for (i = 0; i < N; ++i) {
a[i] = i + 0.5;
params[i] = 0.0;
}
// split workload between workers
int worker_length = N / N_THREADS;
for (i = 0; i < N_THREADS; ++i) {
B[i].thread_num = i;
B[i].lo = i * worker_length;
if (i == N_THREADS - 1) {
B[i].hi = N;
} else {
B[i].hi = i * worker_length + worker_length - 1;
}
}
// pointer to parameters to be passed to worker
struct bounds **data = malloc(N_THREADS * sizeof(struct bounds*));
for (i = 0; i < N_THREADS; i++) {
data[i] = malloc(sizeof(struct bounds));
data[i]->lo = B[i].lo;
data[i]->hi = B[i].hi;
data[i]->thread_num = B[i].thread_num;
}
// create thread objects
pthread_t threads[N_THREADS];
// disallow threads to crunch numbers
for (th = 0; th < N_THREADS; ++th) {
SYNC_THREADS[th] = 0;
}
// launch workers
for(th = 0; th < N_THREADS; th++) {
pthread_create(&threads[th], NULL, crunching, data[th]);
}
// big loop of iterations
for (s = 0; s < STEPS; ++s) {
for (i = 0; i < N; ++i) {
params[i] += 1.0; // adjust parameters
// zero diff array
for (i = 0; i < N; ++i) {
for (th = 0; th < N_THREADS; ++th) {
a_diff[th][i] = 0.0;
}
}
p = &a; // pointer to array a
// allow threads to process numbers and wait for threads to complete
for (th = 0; th < N_THREADS; ++th) { SYNC_THREADS[th] = 1; }
// ...here threads started by pthread_create do calculations...
for (th = 0; th < N_THREADS; th++) { while (SYNC_THREADS[th] != 0) {} }
// join results from threads (number-crunching is additive)
for (i = 0; i < N; ++i) {
joiner = 0.0;
for (th = 0; th < N_THREADS; ++th) {
joiner += a_diff[th][i];
}
a[i] += joiner;
}
}
}
// join workers
END_THREADS = 1;
for(th = 0; th < N_THREADS; th++) {
pthread_join(threads[th], NULL);
}
return 0;
}
我发现工作人员在时间上没有重叠:
worker 0 started for bounds [0 3333332]
worker 1 started for bounds [3333333 6666665]
worker 2 started for bounds [6666666 10000000]
worker 0 working...
worker 1 working...
worker 2 working...
worker 2 stopped...
worker 0 stopped...
worker 1 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 0 stopped...
worker 2 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 2 stopped...
worker 0 stopped...
Process returned 0 (0x0) execution time : 1.505 s
并且我通过 a_diff[thead_num][N]
子阵列将它们分开,以确保工作人员不会进入彼此的工作空间,但是,我不确定情况总是如此并且我没有引入隐藏在某处比赛...
我没有意识到问题是什么:-)
所以,问题是您是否考虑好了 SYNC_THREADS
和 END_THREADS
同步机制。
是的!...差不多。问题是线程在等待时正在燃烧CPU。
条件变量
要使线程等待事件,您需要使用条件变量 (pthread_cond
)。它们提供了一些有用的函数,例如 wait()
、signal()
和 broadcast()
:
wait(&cond, &m)
阻塞给定条件变量中的线程。 [注2]
signal(&cond)
解锁在给定条件变量中等待的线程。
broadcast(&cond)
解锁在给定条件变量中等待的所有线程。
最初你会让所有线程等待[注 1]:
while(!start_threads)
pthread_cond_wait(&cond_start);
并且,当主线程准备就绪时:
start_threads = 1;
pthread_cond_broadcast(&cond_start);
障碍
如果迭代之间存在数据依赖性,则需要确保线程在任何给定时刻都在执行相同的迭代。
要在每次迭代结束时同步线程,您需要查看障碍 (pthread_barrier
):
pthread_barrier_init(count)
:初始化屏障以同步 count
个线程。
pthread_barrier_wait()
:线程在这里等待,直到所有 count
个线程到达屏障。
扩展障碍的功能
有时你会希望最后一个线程到达障碍来计算一些东西(例如增加迭代次数的计数器,或者计算一些全局值,或者检查执行是否应该停止)。你有两个选择
使用 pthread_barrier
s
你基本上需要有两个障碍:
int rc = pthread_barrier_wait(&b);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
if(shouldStop()) stop = 1;
pthread_barrier_wait(&b);
if(stop) return;
使用 pthread_cond
实现我们自己的专业屏障
pthread_mutex_lock(&mutex)
remainingThreads--;
// all threads execute this
executedByAllThreads();
if(remainingThreads == 0) {
// reinitialize barrier
remainingThreads = N;
// only last thread executes this
if(shouldStop()) stop = 1;
pthread_cond_broadcast(&cond);
} else {
while(remainingThreads > 0)
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);
注 1: 为什么 pthread_cond_wait()
在 while
块内?可能看起来有点奇怪。其背后的原因是由于虚假唤醒的存在。即使没有发出 signal()
或 broadcast()
,函数也可能 return。所以,为了保证正确性,通常有一个额外的变量来保证如果一个线程在它应该唤醒之前突然醒来,它会跑回 pthread_cond_wait()
.
来自手册:
When using condition variables there is always a Boolean predicate involving shared variables associated with each condition wait that is true if the thread should proceed. Spurious wakeups from the pthread_cond_timedwait()
or pthread_cond_wait()
functions may occur. Since the return from pthread_cond_timedwait()
or pthread_cond_wait()
does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.
(...)
If a signal is delivered to a thread waiting for a condition variable, upon return from the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or it shall return zero due to spurious wakeup.
注二:
Michael Burr 在评论中指出,每当您修改谓词 (start_threads
) 和 pthread_cond_wait()
时,您都应该持有 companion 锁。 pthread_cond_wait()
调用时会释放互斥锁;并在 returns.
时重新获取它
PS:这里有点晚了;对不起,如果我的文字令人困惑:-)
我是 C 的新手,所以我不确定从哪里开始挖掘我的问题。 我正在尝试将 python number-c运行ching 算法移植到 C,并且由于 C 中没有 GIL(哇哦),我可以从线程更改内存中的任何内容,只要我确保没有比赛。
我做了关于互斥锁的功课,但是,我无法全神贯注地使用互斥锁,以防连续 运行 线程一遍又一遍地访问同一个数组。
我正在使用 p_threads 以便在大阵列 a[N]
上分配工作负载。
数组 a[N]
上的数字 c运行ching 算法是累加的,所以我使用 a_diff[N_THREADS][N]
数组拆分它,将每个线程应用于 a[N]
数组的更改写入 a_diff[N_THREADS][N]
然后在每一步之后将它们合并在一起。
我需要 运行 在不同版本的数组 a[N]
上进行 c运行ching,所以我通过全局指针 p
传递它们(在 MWE 中,有只有一个 a[N]
)
我正在使用另一个全局数组 SYNC_THREADS[N_THREADS]
同步线程,并通过设置 END_THREADS
全局确保线程在我需要时退出(我知道,我使用了太多全局变量 - 我不在乎,代码大约有 200 行)。我的问题是关于这种同步技术 - 这样做安全吗?cleaner/better/faster 实现它的方法是什么?
MWEe:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N_THREADS 3
#define N 10000000
#define STEPS 3
double a[N]; // main array
double a_diff[N_THREADS][N]; // diffs array
double params[N]; // parameter used for number-crunching
double (*p)[N]; // pointer to array[N]
// structure for bounds for crunching the array
struct bounds {
int lo;
int hi;
int thread_num;
};
struct bounds B[N_THREADS];
int SYNC_THREADS[N_THREADS]; // for syncing threads
int END_THREADS = 0; // signal to terminate threads
static void *crunching(void *arg) {
// multiple threads run number-crunching operations according to assigned low/high bounds
struct bounds *data = (struct bounds *)arg;
int lo = (*data).lo;
int hi = (*data).hi;
int thread_num = (*data).thread_num;
printf("worker %d started for bounds [%d %d] \n", thread_num, lo, hi);
int i;
while (END_THREADS != 1) { // END_THREADS tells threads to terminate
if (SYNC_THREADS[thread_num] == 1) { // SYNC_THREADS allows threads to start number-crunching
printf("worker %d working... \n", thread_num );
for (i = lo; i <= hi; ++i) {
a_diff[thread_num][i] += (*p)[i] * params[i]; // pretend this is an expensive operation...
}
SYNC_THREADS[thread_num] = 0; // thread disables itself until SYNC_THREADS is back to 1
printf("worker %d stopped... \n", thread_num );
}
}
return 0;
}
int i, j, th,s;
double joiner;
int main() {
// pre-fill arrays
for (i = 0; i < N; ++i) {
a[i] = i + 0.5;
params[i] = 0.0;
}
// split workload between workers
int worker_length = N / N_THREADS;
for (i = 0; i < N_THREADS; ++i) {
B[i].thread_num = i;
B[i].lo = i * worker_length;
if (i == N_THREADS - 1) {
B[i].hi = N;
} else {
B[i].hi = i * worker_length + worker_length - 1;
}
}
// pointer to parameters to be passed to worker
struct bounds **data = malloc(N_THREADS * sizeof(struct bounds*));
for (i = 0; i < N_THREADS; i++) {
data[i] = malloc(sizeof(struct bounds));
data[i]->lo = B[i].lo;
data[i]->hi = B[i].hi;
data[i]->thread_num = B[i].thread_num;
}
// create thread objects
pthread_t threads[N_THREADS];
// disallow threads to crunch numbers
for (th = 0; th < N_THREADS; ++th) {
SYNC_THREADS[th] = 0;
}
// launch workers
for(th = 0; th < N_THREADS; th++) {
pthread_create(&threads[th], NULL, crunching, data[th]);
}
// big loop of iterations
for (s = 0; s < STEPS; ++s) {
for (i = 0; i < N; ++i) {
params[i] += 1.0; // adjust parameters
// zero diff array
for (i = 0; i < N; ++i) {
for (th = 0; th < N_THREADS; ++th) {
a_diff[th][i] = 0.0;
}
}
p = &a; // pointer to array a
// allow threads to process numbers and wait for threads to complete
for (th = 0; th < N_THREADS; ++th) { SYNC_THREADS[th] = 1; }
// ...here threads started by pthread_create do calculations...
for (th = 0; th < N_THREADS; th++) { while (SYNC_THREADS[th] != 0) {} }
// join results from threads (number-crunching is additive)
for (i = 0; i < N; ++i) {
joiner = 0.0;
for (th = 0; th < N_THREADS; ++th) {
joiner += a_diff[th][i];
}
a[i] += joiner;
}
}
}
// join workers
END_THREADS = 1;
for(th = 0; th < N_THREADS; th++) {
pthread_join(threads[th], NULL);
}
return 0;
}
我发现工作人员在时间上没有重叠:
worker 0 started for bounds [0 3333332]
worker 1 started for bounds [3333333 6666665]
worker 2 started for bounds [6666666 10000000]
worker 0 working...
worker 1 working...
worker 2 working...
worker 2 stopped...
worker 0 stopped...
worker 1 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 0 stopped...
worker 2 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 2 stopped...
worker 0 stopped...
Process returned 0 (0x0) execution time : 1.505 s
并且我通过 a_diff[thead_num][N]
子阵列将它们分开,以确保工作人员不会进入彼此的工作空间,但是,我不确定情况总是如此并且我没有引入隐藏在某处比赛...
我没有意识到问题是什么:-)
所以,问题是您是否考虑好了 SYNC_THREADS
和 END_THREADS
同步机制。
是的!...差不多。问题是线程在等待时正在燃烧CPU。
条件变量
要使线程等待事件,您需要使用条件变量 (pthread_cond
)。它们提供了一些有用的函数,例如 wait()
、signal()
和 broadcast()
:
wait(&cond, &m)
阻塞给定条件变量中的线程。 [注2]signal(&cond)
解锁在给定条件变量中等待的线程。broadcast(&cond)
解锁在给定条件变量中等待的所有线程。
最初你会让所有线程等待[注 1]:
while(!start_threads)
pthread_cond_wait(&cond_start);
并且,当主线程准备就绪时:
start_threads = 1;
pthread_cond_broadcast(&cond_start);
障碍
如果迭代之间存在数据依赖性,则需要确保线程在任何给定时刻都在执行相同的迭代。
要在每次迭代结束时同步线程,您需要查看障碍 (pthread_barrier
):
pthread_barrier_init(count)
:初始化屏障以同步count
个线程。pthread_barrier_wait()
:线程在这里等待,直到所有count
个线程到达屏障。
扩展障碍的功能
有时你会希望最后一个线程到达障碍来计算一些东西(例如增加迭代次数的计数器,或者计算一些全局值,或者检查执行是否应该停止)。你有两个选择
使用 pthread_barrier
s
你基本上需要有两个障碍:
int rc = pthread_barrier_wait(&b);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
if(shouldStop()) stop = 1;
pthread_barrier_wait(&b);
if(stop) return;
使用 pthread_cond
实现我们自己的专业屏障
pthread_mutex_lock(&mutex)
remainingThreads--;
// all threads execute this
executedByAllThreads();
if(remainingThreads == 0) {
// reinitialize barrier
remainingThreads = N;
// only last thread executes this
if(shouldStop()) stop = 1;
pthread_cond_broadcast(&cond);
} else {
while(remainingThreads > 0)
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);
注 1: 为什么 pthread_cond_wait()
在 while
块内?可能看起来有点奇怪。其背后的原因是由于虚假唤醒的存在。即使没有发出 signal()
或 broadcast()
,函数也可能 return。所以,为了保证正确性,通常有一个额外的变量来保证如果一个线程在它应该唤醒之前突然醒来,它会跑回 pthread_cond_wait()
.
来自手册:
When using condition variables there is always a Boolean predicate involving shared variables associated with each condition wait that is true if the thread should proceed. Spurious wakeups from the
pthread_cond_timedwait()
orpthread_cond_wait()
functions may occur. Since the return frompthread_cond_timedwait()
orpthread_cond_wait()
does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.(...)
If a signal is delivered to a thread waiting for a condition variable, upon return from the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or it shall return zero due to spurious wakeup.
注二:
Michael Burr 在评论中指出,每当您修改谓词 (start_threads
) 和 pthread_cond_wait()
时,您都应该持有 companion 锁。 pthread_cond_wait()
调用时会释放互斥锁;并在 returns.
PS:这里有点晚了;对不起,如果我的文字令人困惑:-)