C 线程池 - 传递参数重复

C Thread pool - passing argument duplicate

源代码如下:

#define THREAD 32
#define QUEUE 300
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "threadpool.h"


struct fparam {
int no;
};

int tasks = 0, done = 0;
pthread_mutex_t lock;

int exit_me(){
    pthread_mutex_lock(&lock);
    tasks--;
    pthread_mutex_unlock(&lock);
    return 0;
}

void dummy_task(void *arg) {
    struct fparam *args = arg;
    pthread_mutex_lock(&lock);
    done++;
    pthread_mutex_unlock(&lock);
    printf("Thread INDEX: %d started.\n",args->no);
    exit_me();
}

int main()
{
    int t, result;
    threadpool_t *pool;
    struct fparam push_args;
    pthread_mutex_init(&lock, NULL);
    pool = threadpool_create(THREAD, QUEUE, 0);
    fprintf(stderr, "Pool started with %d threads and "
            "queue size of %d\n", THREAD, QUEUE);

    for (t = 0;t < 2000; t++){
        push_args.no = t;
        result = threadpool_add(pool, &dummy_task, (void *)&push_args, 0);
        if (result == 0){
            pthread_mutex_lock(&lock);
            tasks++;
            pthread_mutex_unlock(&lock);
        } else {
             printf("Something went wrong with thread: %d\n", t);
        }
        while(tasks  >= QUEUE); // do nothing until tasks running is less than max queue.
    }

    while(tasks >= 1);
    return 0;
}

我正在使用 https://github.com/mbrossard/threadpool 池实现。

一切看起来都很好,但是在检查传递给虚拟函数的 t 参数时,我可以看到重复项:

Thread INDEX: 1998 started.
Thread INDEX: 1999 started.
Thread INDEX: 1999 started.
Thread INDEX: 1974 started.
Thread INDEX: 1979 started.
Thread INDEX: 1979 started.
Thread INDEX: 1978 started.
Thread INDEX: 1979 started.
Thread INDEX: 1979 started.

我想给出的代码没有任何竞争条件,因为 fparam 结构是在函数内部声明的。 有什么想法或建议吗?

是的,您的 push_args 存在竞争条件。虽然每个线程都获得了您传入的参数的副本,但您这样做的方式意味着每个线程都获得了相同的值(指针)。

并且主线程在启动新线程时不断修改后面指针的数据,但线程本身正在启动并使用它。

以下面的序列为例思考:

  • main 将 ID 设置为 1 并创建线程 A。
  • main 将 ID 设置为 2 并创建线程 B。
  • 线程A启动并读取2的ID。
  • 线程 B 启动并读取 2 的 ID。

现在两个线程都使用 ID 2。

如果你想那样做,你可能需要等到每个线程都制作了这个 ID 值的本地副本,然后才能在 main.

中修改它

另一种方法是 "automatically" 通过将其作为参数本身(而不是指向它的指针)传递给线程一个本地副本,例如:

result = threadpool_add(pool, &dummy_task, (void *)t, 0);

这确保每个线程都收到 t 的本地化副本,就像调用时一样。您只需将其从 void* 转换回 int

如果您不能使用能够与指针相互转换的简单变量,并且不想等到每个线程都创建了本地副本后再开始下一个,则需要将项目分开正在通过。

实现此目的的一种方法是拥有一个 数组 项目(在您的情况下为结构)并将它们传递给等效线程。例如,您可以这样做:

static struct payload items[100];
for (int i = 0; i < 100; i++) {
    items[i].t = i;
    result = threadpool_add(pool, &dummy_task, (void *)(&items[i]), 0);
    // check result.
}

这在内存上有点昂贵,但它解决了竞争条件问题,而无需序列化线程创建。

我真的做到了。请检查以下内容并提出建议。

int main()
{
    int t, result;
    threadpool_t *pool;
    struct fparam *push_args = NULL;

    pthread_mutex_init(&lock, NULL);
    pool = threadpool_create(THREAD, QUEUE, 0);
    fprintf(stderr, "Pool started with %d threads and "
                    "queue size of %d\n", THREAD, QUEUE);
    for (t = 0;t < 2000; t++){
            push_args = (struct fparam*)malloc(sizeof *push_args);
            push_args->no = t;
            result = threadpool_add(pool, &dummy_task, push_args, 0);
            if (result == 0){
                    pthread_mutex_lock(&lock);
                    tasks++;
                    pthread_mutex_unlock(&lock);
            } else {
                    printf("Something went wrong with thread: %d\n", t);
            }
            while(tasks  >= QUEUE); // do nothing until tasks running is less than max queue.
    }

    while(tasks >= 1);
    free(push_args);
    return 0;
}