在 C 中的 pthreads 之间共享数据
Sharing Data among pthreads in C
我是 C 语言编程的新手,我正在尝试用它来学习线程。但是我不确定如何在线程之间传递参数。
例如,我正在尝试使用此公式计算范数
在单线程函数中,我可以轻松做到:
//vec is a vector array from x1 to xn
//len is the size of vec
float L2_norm_with_single_thread(const float *vec, size_t len) {
double result= 0.0f;
for (int i = 0; i < len; ++i) {
result += vec[i] * vec[i];
}
result = sqrt(result);
return result;
}
但我想在多线程中进行。但我不确定如何在子程序函数 Cal_L2_norm 和 return 中将结果加在一起到主函数。
void *Cal(void *arg) {
// not sure what I should put here to get the vector array.
// not sure what arg is for, but I just copy from other code.
}
float L2_norm(const float *vec, size_t len) {
int num_of_threads = 4 //to compare the efficiency with different number of threads
pthread_t ph[num_of_threads];
int rc;
float result;
for (int i = 0; i < num_of_threads; i++) {
rc = pthread_create(&ph[i], &attr[i], Cal, (void *)&args[i]);
assert(rc == 0);
}
for (int i = 0; i < num_of_threads; i++) {
rc = pthread_join(ph[i], NULL);
assert(rc == 0);
}
result = sqrt(result)
return result;
}
我需要像下面这样定义结构吗?
感谢任何帮助。谢谢。
全局 table 可以拆分为分配给每个线程的大小相等的子集。他们接收子集的大小,子集在全局向量中的偏移table,他们将return一个计算结果。因此,定义一个结构将这些参数传递给线程是一个解决方案:
// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
};
线程必须同时启动,否则某些线程可能会在其他线程创建之前完成。使用一个 barrier 让线程“同时”开始计算是一种解决方案。主程序使用涉及的线程数(包括主线程)初始化屏障:
// Thread synchronization
pthread_barrier_t barrier;
[...]
// Initialize the barrier (number of secondary threads + main thread)
rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
if (rc != 0) {
errno = rc;
perror("pthread_barrier_init()");
return 1;
}
[...]
// Synchronize with the threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return 1;
}
正如您所说,您是 pthreads 的初学者,请注意,在出错时,函数不会设置 errno
而是 return 错误代码。因此,要在错误消息中使用 errno
,请不要忘记将 errno
设置为失败 pthread 函数的 return 代码。例如:
rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
然后,程序可以将计算线程的数量和向量的值作为参数:
if (ac < 3) {
fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
return 1;
}
这是程序的源代码示例:
/*
Compute the Euclidean norm (https://en.wikipedia.org/wiki/Norm_(mathematics))
*/
#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <libgen.h>
// Vector table
float *vec;
// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
};
// Thread synchronization
pthread_barrier_t barrier;
void *th_entry(void *arg)
{
struct th_param *param = (struct th_param *)arg;
int i;
int rc;
param->result = 0;
// Synchronize with the other threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return NULL;
}
for (i = 0; i < param->nb; i ++) {
param->result += vec[param->offset + i] * vec[param->offset + i];
}
return NULL;
}
int main(int ac, char *av[])
{
int i;
size_t nb_vec;
int nb_threads;
struct th_param *th_table;
int rc;
double result;
if (ac < 3) {
fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
return 1;
}
nb_threads = atoi(av[1]);
if (nb_threads <= 0) {
fprintf(stderr, "Bad number of threads\n");
return 1;
}
nb_vec = ac - 2;
if (nb_threads > nb_vec) {
fprintf(stderr, "Too many threads\n");
return 1;
}
// Allocate the vector table
vec = (float *)malloc(nb_vec * sizeof(float));
if (!vec) {
perror("malloc");
return 1;
}
// Allocate the per-thread parameters
th_table = (struct th_param *)malloc(nb_threads * sizeof(struct th_param));
if (!th_table) {
perror("malloc");
return 1;
}
// Populate the vector table
for (i = 0; i < nb_vec; i ++) {
vec[i] = strtof(av[i + 2], NULL);
}
// Initialize the barrier (number of secondary threads + main thread)
rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
if (rc != 0) {
errno = rc;
perror("pthread_barrier_init()");
return 1;
}
// Create the threads
for (i = 0; i < (nb_threads - 1); i ++) {
th_table[i].offset = i * (nb_vec / nb_threads);
th_table[i].nb = nb_vec / nb_threads;
printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);
rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
}
// The last thread may have less/more slots
th_table[i].offset = i * (nb_vec / nb_threads);
th_table[i].nb = nb_vec / nb_threads;
th_table[i].nb = nb_vec - (i * th_table[i].nb);
printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);
rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
result = 0;
// Synchronize with the threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return 1;
}
for (i = 0; i < nb_threads; i ++) {
rc = pthread_join(th_table[i].tid, NULL);
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
result += th_table[i].result;
}
result = sqrt(result);
printf("Result=%f\n", result);
return 0;
}
建成:
$ gcc euclidean_norm.c -o euclidean_norm -lpthread -lm
运行它:
$ ./euclidean_norm
Usage: euclidean_norm nb_threads x1 x2 x3...
$ ./euclidean_norm 1 1 2 3
Thread#0, offset=0, nb=3
Result=3.741657
$ ./euclidean_norm 2 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=2
Result=3.741657
$ ./euclidean_norm 3 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=1
Thread#2, offset=2, nb=1
Result=3.741657
要测量持续时间,可以使用 time command or for finer granularity functions like gettimeofday()...
您还可以通过在线程创建时将 CPU affinity 属性作为线程条目中 pthread_create (cf. pthread_attr_init) or pass the CPU core number in the parameters and call pthread_set_affinity_np 的第二个参数传递,使线程 运行 在单独的 CPU 核心上点.
您也可以考虑设置 pthread_setschedparam 或上述线程属性的线程的调度 policy/priority。
在为 multi-threaded 应用程序实现数据结构时,存在一个常见的陷阱,即忘记或低估 false sharing on the performances. That is why data structure alignments on cache line sizes is also an important concern (e.g. gcc
provides the aligned 属性的影响)。在上面的应用示例中,当一个线程将其结果写入参数结构时,如果连续的参数结构共享相同的缓存行,它可能会触发与其他线程的假共享。要使用 gcc
解决这个问题,我们可以使用 aligned
属性使 table 参数的每个条目位于单独的缓存行上。获取缓存行大小的一种方法是查看 /proc/cpuinfo
:
$ cat /proc/cpuinfo | grep cache_alignment
cache_alignment : 64
[...]
结构可以重新定义为:
// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
} __attribute__ ((aligned (64)));
我是 C 语言编程的新手,我正在尝试用它来学习线程。但是我不确定如何在线程之间传递参数。
例如,我正在尝试使用此公式计算范数
在单线程函数中,我可以轻松做到:
//vec is a vector array from x1 to xn
//len is the size of vec
float L2_norm_with_single_thread(const float *vec, size_t len) {
double result= 0.0f;
for (int i = 0; i < len; ++i) {
result += vec[i] * vec[i];
}
result = sqrt(result);
return result;
}
但我想在多线程中进行。但我不确定如何在子程序函数 Cal_L2_norm 和 return 中将结果加在一起到主函数。
void *Cal(void *arg) {
// not sure what I should put here to get the vector array.
// not sure what arg is for, but I just copy from other code.
}
float L2_norm(const float *vec, size_t len) {
int num_of_threads = 4 //to compare the efficiency with different number of threads
pthread_t ph[num_of_threads];
int rc;
float result;
for (int i = 0; i < num_of_threads; i++) {
rc = pthread_create(&ph[i], &attr[i], Cal, (void *)&args[i]);
assert(rc == 0);
}
for (int i = 0; i < num_of_threads; i++) {
rc = pthread_join(ph[i], NULL);
assert(rc == 0);
}
result = sqrt(result)
return result;
}
我需要像下面这样定义结构吗?
感谢任何帮助。谢谢。
全局 table 可以拆分为分配给每个线程的大小相等的子集。他们接收子集的大小,子集在全局向量中的偏移table,他们将return一个计算结果。因此,定义一个结构将这些参数传递给线程是一个解决方案:
// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
};
线程必须同时启动,否则某些线程可能会在其他线程创建之前完成。使用一个 barrier 让线程“同时”开始计算是一种解决方案。主程序使用涉及的线程数(包括主线程)初始化屏障:
// Thread synchronization
pthread_barrier_t barrier;
[...]
// Initialize the barrier (number of secondary threads + main thread)
rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
if (rc != 0) {
errno = rc;
perror("pthread_barrier_init()");
return 1;
}
[...]
// Synchronize with the threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return 1;
}
正如您所说,您是 pthreads 的初学者,请注意,在出错时,函数不会设置 errno
而是 return 错误代码。因此,要在错误消息中使用 errno
,请不要忘记将 errno
设置为失败 pthread 函数的 return 代码。例如:
rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
然后,程序可以将计算线程的数量和向量的值作为参数:
if (ac < 3) {
fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
return 1;
}
这是程序的源代码示例:
/*
Compute the Euclidean norm (https://en.wikipedia.org/wiki/Norm_(mathematics))
*/
#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <libgen.h>
// Vector table
float *vec;
// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
};
// Thread synchronization
pthread_barrier_t barrier;
void *th_entry(void *arg)
{
struct th_param *param = (struct th_param *)arg;
int i;
int rc;
param->result = 0;
// Synchronize with the other threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return NULL;
}
for (i = 0; i < param->nb; i ++) {
param->result += vec[param->offset + i] * vec[param->offset + i];
}
return NULL;
}
int main(int ac, char *av[])
{
int i;
size_t nb_vec;
int nb_threads;
struct th_param *th_table;
int rc;
double result;
if (ac < 3) {
fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
return 1;
}
nb_threads = atoi(av[1]);
if (nb_threads <= 0) {
fprintf(stderr, "Bad number of threads\n");
return 1;
}
nb_vec = ac - 2;
if (nb_threads > nb_vec) {
fprintf(stderr, "Too many threads\n");
return 1;
}
// Allocate the vector table
vec = (float *)malloc(nb_vec * sizeof(float));
if (!vec) {
perror("malloc");
return 1;
}
// Allocate the per-thread parameters
th_table = (struct th_param *)malloc(nb_threads * sizeof(struct th_param));
if (!th_table) {
perror("malloc");
return 1;
}
// Populate the vector table
for (i = 0; i < nb_vec; i ++) {
vec[i] = strtof(av[i + 2], NULL);
}
// Initialize the barrier (number of secondary threads + main thread)
rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
if (rc != 0) {
errno = rc;
perror("pthread_barrier_init()");
return 1;
}
// Create the threads
for (i = 0; i < (nb_threads - 1); i ++) {
th_table[i].offset = i * (nb_vec / nb_threads);
th_table[i].nb = nb_vec / nb_threads;
printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);
rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
}
// The last thread may have less/more slots
th_table[i].offset = i * (nb_vec / nb_threads);
th_table[i].nb = nb_vec / nb_threads;
th_table[i].nb = nb_vec - (i * th_table[i].nb);
printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);
rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
result = 0;
// Synchronize with the threads
rc = pthread_barrier_wait(&barrier);
if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
errno = rc;
perror("pthread_barrier_wait()");
return 1;
}
for (i = 0; i < nb_threads; i ++) {
rc = pthread_join(th_table[i].tid, NULL);
if (rc != 0) {
errno = rc;
perror("pthread_create()");
return 1;
}
result += th_table[i].result;
}
result = sqrt(result);
printf("Result=%f\n", result);
return 0;
}
建成:
$ gcc euclidean_norm.c -o euclidean_norm -lpthread -lm
运行它:
$ ./euclidean_norm
Usage: euclidean_norm nb_threads x1 x2 x3...
$ ./euclidean_norm 1 1 2 3
Thread#0, offset=0, nb=3
Result=3.741657
$ ./euclidean_norm 2 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=2
Result=3.741657
$ ./euclidean_norm 3 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=1
Thread#2, offset=2, nb=1
Result=3.741657
要测量持续时间,可以使用 time command or for finer granularity functions like gettimeofday()...
您还可以通过在线程创建时将 CPU affinity 属性作为线程条目中 pthread_create (cf. pthread_attr_init) or pass the CPU core number in the parameters and call pthread_set_affinity_np 的第二个参数传递,使线程 运行 在单独的 CPU 核心上点.
您也可以考虑设置 pthread_setschedparam 或上述线程属性的线程的调度 policy/priority。
在为 multi-threaded 应用程序实现数据结构时,存在一个常见的陷阱,即忘记或低估 false sharing on the performances. That is why data structure alignments on cache line sizes is also an important concern (e.g. gcc
provides the aligned 属性的影响)。在上面的应用示例中,当一个线程将其结果写入参数结构时,如果连续的参数结构共享相同的缓存行,它可能会触发与其他线程的假共享。要使用 gcc
解决这个问题,我们可以使用 aligned
属性使 table 参数的每个条目位于单独的缓存行上。获取缓存行大小的一种方法是查看 /proc/cpuinfo
:
$ cat /proc/cpuinfo | grep cache_alignment
cache_alignment : 64
[...]
结构可以重新定义为:
// Thread's parameters
struct th_param {
pthread_t tid;
int offset;
size_t nb;
double result;
} __attribute__ ((aligned (64)));