在 c 和 运行 中使用互斥锁的多线程一次一个线程

multithreading with mutexes in c and running one thread at a time

我有一个 100 requests(整数)数组。我想创建 4 个线程,我调用一个函数(thread_function),使用这个函数,我希望每个线程一个一个地接受请求:

(thread0->request0,

thread1->request1,

thread2->request2,

thread3->request3

然后是 thread0->request4 等等最多 100) 所有这些都是通过使用 mutexes。 这是我到目前为止编写的代码:

threadRes = pthread_create(&(threadID[i]), NULL,thread_function, (void *)id_size);

这是在我的主内,在我的主外循环 4 times.Now:

void *thread_function(void *arg){
int *val_p=(int *) arg;
for(i=0; i<200; i=i+2)     
{
    f=false;
        
    for (j= 0; j<100; j++)   
    {
        if (val_p[i]==cache[j].id)  
            f=true;
      
    }
    if(f==true) 
    {
        printf("The request  %d has been served.\n",val_p[i]); 

    }
    else
    {
        cache[k].id=val_p[i];
        printf("\nCurrent request to be served:%d \n",cache[k].id);
        k++;
    }
}

其中:val_p 是包含请求的数组,cache 是存储 id(请求)。

-所以现在我想要互斥锁来同步我的线程。我考虑过在我的 main:

中使用
pthread_join(threadID[0], NULL);
pthread_join(threadID[1], NULL);
pthread_join(threadID[2], NULL);
pthread_join(threadID[3], NULL);

pthread_mutex_destroy(&mutex);

并在函数内部使用:

pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);

在我结束之前我想说到目前为止我的程序结果是 4threads 运行 100 个请求每个 (400) 我想要实现的是4 个线程 运行 总共 100 个线程

感谢您的宝贵时间。

您需要使用如下所示的循环:

  1. 获取锁。
  2. 看看有没有什么工作要做。如果不是,则释放锁并终止。
  3. 将我们要做的工作标记为不再需要完成。
  4. 解除锁定。
  5. 做作业。
  6. (如有必要)获取锁。标记工作完成 and/or 报告结果。释放锁。
  7. 转到步骤 1。

注意在持有锁的同时,线程如何发现它应该做的工作,然后在释放锁之前阻止任何其他线程进行相同的分配。另请注意,在执行工作时不持有锁,以便多个线程可以同时工作。

您可能想要 post 更多代码。如何设置数组,如何将段传递给各个线程等

请注意,使用 printf 会扰乱线程的计时。它有自己的互斥锁来访问 stdout,因此最好不要对此进行操作。或者,拥有一组每线程日志文件,以便 printf 调用不会相互阻塞。

此外,在线程循环中,一旦将 f 设置为 true,就可以发出 break,因为无需进一步扫描。

val_p[i] 是循环不变的,因此我们可以在 i 循环开始时只获取 一次

我们没有看到 kcache,但您需要互斥包装 设置 这些值的代码。

但是,这 不会 防止 for 循环中的竞争。您必须将 cache[j].id 的提取包装在循环内的互斥锁对中。你可能没问题没有一些具有良好缓存侦听的拱门循环内的互斥锁(例如x86).


您最好使用 stdatomic.h 原语。这是一个说明这一点的版本。它编译但我没有测试它:

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>

int k;
#define true    1
#define false   0

struct cache {
    int id;
};

struct cache cache[100];

#ifdef DEBUG
#define dbgprt(_fmt...) \
    printf(_fmt)
#else
#define dbgprt(_fmt...) \
    do { } while (0)
#endif

void *
thread_function(void *arg)
{
    int *val_p = arg;
    int i;
    int j;
    int cval;
    int *cptr;

    for (i = 0; i < 200; i += 2) {
        int pval = val_p[i];
        int f = false;

        // decide if request has already been served
        for (j = 0; j < 100; j++) {
            cptr = &cache[j].id;
            cval = atomic_load(cptr);
            if (cval == pval) {
                f = true;
                break;
            }
        }

        if (f == true) {
            dbgprt("The request %d has been served.\n",pval);
            continue;
        }

        // increment the global k value [atomically]
        int kold = atomic_load(&k);
        int knew;
        while (1) {
            knew = kold + 1;
            if (atomic_compare_exchange_strong(&k,&kold,knew))
                break;
        }

        // get current cache value
        cptr = &cache[kold].id;
        int oldval = atomic_load(cptr);

        // mark the cache
        // this should never loop because we atomically got our slot with
        // the k value
        while (1) {
            if (atomic_compare_exchange_strong(cptr,&oldval,pval))
                break;
        }

        dbgprt("\nCurrent request to be served:%d\n",pval);
    }

    return (void *) 0;
}