C 中的 Pthread 信号 Linux

Pthread signalling in C Linux

我正在使用 Pthread 在 Linux 中使用多线程。

Thread1 通过轮询字符设备文件等待来自驱动程序的 IRQ(我的驱动程序具有 ISR 以捕获来自硬件的 IRQ)。

IRQ -----> Thread1 |-----> Thread2 |-----> Thread3 |-----> Thread4

每当 Thread1 收到 IRQ 时,我想向 Thread2Thread3Thread4 发送信号以唤醒它们然后工作。

现在,我正在尝试使用 "pthread conditional variable" 和 "pthread mutex"。不过好像不是什么好办法。

在这种情况下,什么是有效的同步方式?请帮忙。

非常感谢。

据我了解,您的问题是您的子线程(线程 2 到 4)并不总是为 Thread1 收到的每个 IRQ 恰好唤醒一次——特别是,可能是收到了一个 IRQ而子线程已经唤醒并处理较早的 IRQ,这导致它们不会被新的 IRQ 唤醒。

如果这是正确的,那么我认为一个简单的解决方案是为每个子线程使用 counting semaphore,而不是条件变量。信号量是一种简单的数据结构,它维护一个整数计数器,并提供两个操作,wait/P 和 signal/V。 wait/P 递减计数器,如果计数器的新值为负,它将阻塞直到计数器再次变为非负。 signal/V 递增计数器,如果计数器在递增之前为负,则唤醒一个等待线程(如果一个线程在 wait/P 中被阻塞)。

这样做的效果是,在您的主线程快速连续获得多个 IRQ 的情况下,信号量将 "remember" 多次 signal/V 调用(作为计数器的正整数值),并允许工作线程在未来多次调用 wait/P 而不会阻塞。这样就没有信号 "forgotten".

Linux 提供一个信号量 API(通过 sem_init() 等),但它是为进程间同步而设计的,因此对于同步进程内的线程来说有点重量级一个单一的过程。幸运的是,使用 pthreads 互斥锁和条件变量很容易实现您自己的信号量,如下所示。

请注意,在这个玩具示例中,main() 线程扮演 Thread1 的角色,每次您在终端 return 中按 return 时,它都会假装收到 IRQ .子线程扮演 Threads2-4 的角色,每次 Thread1 向它们发出信号时,它们都会假装做一秒钟的 "work"。特别要注意,如果您快速连续多次按下 return,子线程将始终执行那么多 "work units",即使它们每秒只能执行一个工作单元。

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

struct example_semaphore
{
   pthread_cond_t cond;
   pthread_mutex_t mutex;
   int count;  // acccess to this is serialized by locking (mutex)
};

// Initializes the example_semaphore (to be called at startup)
void Init_example_semaphore(struct example_semaphore * s)
{
   s->count = 0;
   pthread_mutex_init(&s->mutex, NULL);
   pthread_cond_init(&s->cond, NULL);
}

// V:  Increments the example_semaphore's count by 1.  If the pre-increment
//     value was negative, wakes a process that was waiting on the
//     example_semaphore
void Signal_example_semaphore(struct example_semaphore * s)
{
   pthread_mutex_lock(&s->mutex);
   if (s->count++ < 0) pthread_cond_signal(&s->cond);
   pthread_mutex_unlock(&s->mutex);
}

// P:  Decrements the example_semaphore's count by 1.  If the new value of the
//     example_semaphore is negative, blocks the caller until another thread calls
//     Signal_example_semaphore()
void Wait_example_semaphore(struct example_semaphore * s)
{
   pthread_mutex_lock(&s->mutex);
   while(--s->count < 0)
   {
      pthread_cond_wait(&s->cond, &s->mutex);
      if (s->count >= 0) break;
   }
   pthread_mutex_unlock(&s->mutex);
}

// This is the function that the worker-threads run
void * WorkerThreadFunc(void * arg)
{
   int workUnit = 0;
   struct example_semaphore * my_semaphore = (struct example_semaphore *) arg;
   while(1)
   {
      Wait_example_semaphore(my_semaphore);  // wait here until it's time to work
      printf("Thread %p: just woke up and is working on work-unit #%i...\n", my_semaphore, workUnit++);
      sleep(1);  // actual work would happen here in a real program
   }
}

static const int NUM_THREADS = 3;

int main(int argc, char ** argv)
{
   struct example_semaphore semaphores[NUM_THREADS];
   pthread_t worker_threads[NUM_THREADS];

   // Setup semaphores and spawn worker threads
   int i = 0;
   for (i=0; i<NUM_THREADS; i++)
   {
      Init_example_semaphore(&semaphores[i]);
      pthread_create(&worker_threads[i], NULL, WorkerThreadFunc, &semaphores[i]);
   }

   // Now we'll pretend to be receiving IRQs.  We'll pretent to
   // get one IRQ each time you press return.
   while(1)
   {
      char buf[128];
      fgets(buf, sizeof(buf), stdin);
      printf("Main thread got IRQ, signalling child threads now!\n");
      for (i=0; i<NUM_THREADS; i++) Signal_example_semaphore(&semaphores[i]);
   }
}

我喜欢 jeremy 的回答,但它确实有一些不足,因为中断调度程序需要知道每个中断要增加多少信号量。 此外,每个增量都可能是一个内核调用,因此每个中断都有很多内核调用。

另一种方法是了解 pthread_cond_broadcast() 的工作原理。我在下面举了一个例子:

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>

#ifndef NTHREAD
#define NTHREAD 5
#endif

pthread_mutex_t Lock;
pthread_cond_t  CV;
int GlobalCount;
int Done;

#define X(y)   do { if (y == -1) abort(); } while (0)

void *handler(void *x) {
    unsigned icount;

    X(pthread_mutex_lock(&Lock));
    icount = 0;
    while (!Done) {
        if (icount < GlobalCount) {
            X(pthread_mutex_unlock(&Lock));
            icount++;
            X(pthread_mutex_lock(&Lock));
        } else {
            X(pthread_cond_wait(&CV, &Lock));
        }
    }
    X(pthread_mutex_unlock(&Lock));
    return NULL;
}

int 
main()
{
    X(pthread_mutex_init(&Lock, NULL));
    X(pthread_cond_init(&CV, NULL));
    pthread_t id[NTHREAD];
    int i;
    for (i = 0; i < NTHREAD; i++) {
        X(pthread_create(id+i, NULL, handler, NULL));
    }
    int c;
    while ((c = getchar()) != EOF) {
        X(pthread_mutex_lock(&Lock));
        GlobalCount++;
        X(pthread_mutex_unlock(&Lock));
        X(pthread_cond_broadcast(&CV));
    }

    X(pthread_mutex_lock(&Lock));
    Done = 1;
    X(pthread_cond_broadcast(&CV));
    X(pthread_mutex_unlock(&Lock));
    for (i = 0; i < NTHREAD; i++) {
        X(pthread_join(id[i], NULL));
    }
    return 0;
}