在 C 中的 pthreads 之间共享数据

Sharing Data among pthreads in C

我是 C 语言编程的新手,我正在尝试用它来学习线程。但是我不确定如何在线程之间传递参数。 例如,我正在尝试使用此公式计算范数

在单线程函数中,我可以轻松做到:

//vec is a vector array from x1 to xn
//len is the size of vec
float L2_norm_with_single_thread(const float *vec, size_t len) {
  double result= 0.0f;
  for (int i = 0; i < len; ++i) {
    result += vec[i] * vec[i];
  }
  result = sqrt(result);
  return result;
}

但我想在多线程中进行。但我不确定如何在子程序函数 Cal_L2_norm 和 return 中将结果加在一起到主函数。

void *Cal(void *arg) { 
// not sure what I should put here to get the vector array. 
// not sure what arg is for, but I just copy from other code. 
}

float L2_norm(const float *vec, size_t len) {
  int num_of_threads = 4 //to compare the efficiency with different number of threads
  pthread_t ph[num_of_threads];
  int rc;
  float result;

  for (int i = 0; i < num_of_threads; i++) {
    rc = pthread_create(&ph[i], &attr[i], Cal, (void *)&args[i]);
    assert(rc == 0);
  }

  for (int i = 0; i < num_of_threads; i++) {
    rc = pthread_join(ph[i], NULL);
    assert(rc == 0);
  }
  result = sqrt(result)
  return result;
 }

我需要像下面这样定义结构吗?

感谢任何帮助。谢谢。

全局 table 可以拆分为分配给每个线程的大小相等的子集。他们接收子集的大小,子集在全局向量中的偏移table,他们将return一个计算结果。因此,定义一个结构将这些参数传递给线程是一个解决方案:

// Thread's parameters
struct th_param {
  pthread_t tid;
  int       offset;
  size_t    nb;
  double    result;
};

线程必须同时启动,否则某些线程可能会在其他线程创建之前完成。使用一个 barrier 让线程“同时”开始计算是一种解决方案。主程序使用涉及的线程数(包括主线程)初始化屏障:

// Thread synchronization
pthread_barrier_t barrier;
[...]
  // Initialize the barrier (number of secondary threads + main thread)
  rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
  if (rc != 0) {
    errno = rc;
    perror("pthread_barrier_init()");
    return 1;
  }
[...]
  // Synchronize with the threads
  rc = pthread_barrier_wait(&barrier);
  if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
    errno = rc;
    perror("pthread_barrier_wait()");
    return 1;
  }

正如您所说,您是 pthreads 的初学者,请注意,在出错时,函数不会设置 errno 而是 return 错误代码。因此,要在错误消息中使用 errno,请不要忘记将 errno 设置为失败 pthread 函数的 return 代码。例如:

rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
if (rc != 0) {
  errno = rc;
  perror("pthread_create()");
  return 1;
}

然后,程序可以将计算线程的数量和向量的值作为参数:

  if (ac < 3) {
    fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
    return 1;
  }

这是程序的源代码示例:

/*

  Compute the Euclidean norm (https://en.wikipedia.org/wiki/Norm_(mathematics))

*/
#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <libgen.h>

// Vector table
float *vec;


// Thread's parameters
struct th_param {
  pthread_t tid;
  int       offset;
  size_t    nb;
  double    result;
};

// Thread synchronization
pthread_barrier_t barrier;


void *th_entry(void *arg)
{
  struct th_param *param = (struct th_param *)arg;
  int i;
  int rc;

  param->result = 0;

  // Synchronize with the other threads
  rc = pthread_barrier_wait(&barrier);
  if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
    errno = rc;
    perror("pthread_barrier_wait()");
    return NULL;
  }

  for (i = 0; i < param->nb; i ++) {
    param->result += vec[param->offset + i] * vec[param->offset + i];
  }

  return NULL;
}

int main(int ac, char *av[])
{
  int              i;
  size_t           nb_vec;
  int              nb_threads;
  struct th_param *th_table;
  int              rc;
  double           result;

  if (ac < 3) {
    fprintf(stderr, "Usage: %s nb_threads x1 x2 x3...\n", basename(av[0]));
    return 1;
  }

  nb_threads = atoi(av[1]);

  if (nb_threads <= 0) {
    fprintf(stderr, "Bad number of threads\n");
    return 1;
  }

  nb_vec = ac - 2;

  if (nb_threads > nb_vec) {
    fprintf(stderr, "Too many threads\n");
    return 1;
  }

  // Allocate the vector table
  vec = (float *)malloc(nb_vec * sizeof(float));
  if (!vec) {
    perror("malloc");
    return 1;
  }

  // Allocate the per-thread parameters
  th_table = (struct th_param *)malloc(nb_threads * sizeof(struct th_param));
  if (!th_table) {
    perror("malloc");
    return 1;
  }

  // Populate the vector table
  for (i = 0; i < nb_vec; i ++) {

    vec[i] = strtof(av[i + 2], NULL);

  }

  // Initialize the barrier (number of secondary threads + main thread)
  rc = pthread_barrier_init(&barrier, NULL, nb_threads + 1);
  if (rc != 0) {
    errno = rc;
    perror("pthread_barrier_init()");
    return 1;
  }

  // Create the threads
  for (i = 0; i < (nb_threads - 1); i ++) {

    th_table[i].offset = i * (nb_vec / nb_threads);
    th_table[i].nb = nb_vec / nb_threads;

    printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);

    rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
    if (rc != 0) {
      errno = rc;
      perror("pthread_create()");
      return 1;
    }

  }

  // The last thread may have less/more slots
  th_table[i].offset = i * (nb_vec / nb_threads);
  th_table[i].nb = nb_vec / nb_threads;
  th_table[i].nb = nb_vec - (i * th_table[i].nb);

  printf("Thread#%d, offset=%d, nb=%zu\n", i, th_table[i].offset, th_table[i].nb);

  rc = pthread_create(&(th_table[i].tid), NULL, th_entry, &(th_table[i]));
  if (rc != 0) {
    errno = rc;
    perror("pthread_create()");
    return 1;
  }

  result = 0;

  // Synchronize with the threads
  rc = pthread_barrier_wait(&barrier);
  if (rc != PTHREAD_BARRIER_SERIAL_THREAD && rc != 0) {
    errno = rc;
    perror("pthread_barrier_wait()");
    return 1;
  }

  for (i = 0; i < nb_threads; i ++) {

    rc = pthread_join(th_table[i].tid, NULL);
    if (rc != 0) {
      errno = rc;
      perror("pthread_create()");
      return 1;
    }

    result += th_table[i].result;

  }

  result = sqrt(result);

  printf("Result=%f\n", result);

  return 0;

}

建成:

$ gcc euclidean_norm.c -o euclidean_norm -lpthread -lm

运行它:

$ ./euclidean_norm
Usage: euclidean_norm nb_threads x1 x2 x3...
$ ./euclidean_norm 1 1 2 3
Thread#0, offset=0, nb=3
Result=3.741657
$ ./euclidean_norm 2 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=2
Result=3.741657
$ ./euclidean_norm 3 1 2 3
Thread#0, offset=0, nb=1
Thread#1, offset=1, nb=1
Thread#2, offset=2, nb=1
Result=3.741657

要测量持续时间,可以使用 time command or for finer granularity functions like gettimeofday()...

您还可以通过在线程创建时将 CPU affinity 属性作为线程条目中 pthread_create (cf. pthread_attr_init) or pass the CPU core number in the parameters and call pthread_set_affinity_np 的第二个参数传递,使线程 运行 在单独的 CPU 核心上点.

您也可以考虑设置 pthread_setschedparam 或上述线程属性的线程的调度 policy/priority。

在为 multi-threaded 应用程序实现数据结构时,存在一个常见的陷阱,即忘记或低估 false sharing on the performances. That is why data structure alignments on cache line sizes is also an important concern (e.g. gcc provides the aligned 属性的影响)。在上面的应用示例中,当一个线程将其结果写入参数结构时,如果连续的参数结构共享相同的缓存行,它可能会触发与其他线程的假共享。要使用 gcc 解决这个问题,我们可以使用 aligned 属性使 table 参数的每个条目位于单独的缓存行上。获取缓存行大小的一种方法是查看 /proc/cpuinfo:

$ cat /proc/cpuinfo | grep cache_alignment
cache_alignment : 64
[...]

结构可以重新定义为:

// Thread's parameters
struct th_param {
  pthread_t tid;
  int       offset;
  size_t    nb;
  double    result;
} __attribute__ ((aligned (64)));