C 中的 Pthread 信号 Linux
Pthread signalling in C Linux
我正在使用 Pthread
在 Linux 中使用多线程。
Thread1
通过轮询字符设备文件等待来自驱动程序的 IRQ(我的驱动程序具有 ISR 以捕获来自硬件的 IRQ)。
IRQ -----> Thread1 |-----> Thread2
|-----> Thread3
|-----> Thread4
每当 Thread1
收到 IRQ 时,我想向 Thread2
、Thread3
和 Thread4
发送信号以唤醒它们然后工作。
现在,我正在尝试使用 "pthread conditional variable" 和 "pthread mutex"。不过好像不是什么好办法。
在这种情况下,什么是有效的同步方式?请帮忙。
非常感谢。
据我了解,您的问题是您的子线程(线程 2 到 4)并不总是为 Thread1 收到的每个 IRQ 恰好唤醒一次——特别是,可能是收到了一个 IRQ而子线程已经唤醒并处理较早的 IRQ,这导致它们不会被新的 IRQ 唤醒。
如果这是正确的,那么我认为一个简单的解决方案是为每个子线程使用 counting semaphore,而不是条件变量。信号量是一种简单的数据结构,它维护一个整数计数器,并提供两个操作,wait/P 和 signal/V。 wait/P 递减计数器,如果计数器的新值为负,它将阻塞直到计数器再次变为非负。 signal/V 递增计数器,如果计数器在递增之前为负,则唤醒一个等待线程(如果一个线程在 wait/P 中被阻塞)。
这样做的效果是,在您的主线程快速连续获得多个 IRQ 的情况下,信号量将 "remember" 多次 signal/V 调用(作为计数器的正整数值),并允许工作线程在未来多次调用 wait/P 而不会阻塞。这样就没有信号 "forgotten".
Linux 提供一个信号量 API(通过 sem_init() 等),但它是为进程间同步而设计的,因此对于同步进程内的线程来说有点重量级一个单一的过程。幸运的是,使用 pthreads 互斥锁和条件变量很容易实现您自己的信号量,如下所示。
请注意,在这个玩具示例中,main() 线程扮演 Thread1 的角色,每次您在终端 return 中按 return 时,它都会假装收到 IRQ .子线程扮演 Threads2-4 的角色,每次 Thread1 向它们发出信号时,它们都会假装做一秒钟的 "work"。特别要注意,如果您快速连续多次按下 return,子线程将始终执行那么多 "work units",即使它们每秒只能执行一个工作单元。
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
struct example_semaphore
{
pthread_cond_t cond;
pthread_mutex_t mutex;
int count; // acccess to this is serialized by locking (mutex)
};
// Initializes the example_semaphore (to be called at startup)
void Init_example_semaphore(struct example_semaphore * s)
{
s->count = 0;
pthread_mutex_init(&s->mutex, NULL);
pthread_cond_init(&s->cond, NULL);
}
// V: Increments the example_semaphore's count by 1. If the pre-increment
// value was negative, wakes a process that was waiting on the
// example_semaphore
void Signal_example_semaphore(struct example_semaphore * s)
{
pthread_mutex_lock(&s->mutex);
if (s->count++ < 0) pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->mutex);
}
// P: Decrements the example_semaphore's count by 1. If the new value of the
// example_semaphore is negative, blocks the caller until another thread calls
// Signal_example_semaphore()
void Wait_example_semaphore(struct example_semaphore * s)
{
pthread_mutex_lock(&s->mutex);
while(--s->count < 0)
{
pthread_cond_wait(&s->cond, &s->mutex);
if (s->count >= 0) break;
}
pthread_mutex_unlock(&s->mutex);
}
// This is the function that the worker-threads run
void * WorkerThreadFunc(void * arg)
{
int workUnit = 0;
struct example_semaphore * my_semaphore = (struct example_semaphore *) arg;
while(1)
{
Wait_example_semaphore(my_semaphore); // wait here until it's time to work
printf("Thread %p: just woke up and is working on work-unit #%i...\n", my_semaphore, workUnit++);
sleep(1); // actual work would happen here in a real program
}
}
static const int NUM_THREADS = 3;
int main(int argc, char ** argv)
{
struct example_semaphore semaphores[NUM_THREADS];
pthread_t worker_threads[NUM_THREADS];
// Setup semaphores and spawn worker threads
int i = 0;
for (i=0; i<NUM_THREADS; i++)
{
Init_example_semaphore(&semaphores[i]);
pthread_create(&worker_threads[i], NULL, WorkerThreadFunc, &semaphores[i]);
}
// Now we'll pretend to be receiving IRQs. We'll pretent to
// get one IRQ each time you press return.
while(1)
{
char buf[128];
fgets(buf, sizeof(buf), stdin);
printf("Main thread got IRQ, signalling child threads now!\n");
for (i=0; i<NUM_THREADS; i++) Signal_example_semaphore(&semaphores[i]);
}
}
我喜欢 jeremy 的回答,但它确实有一些不足,因为中断调度程序需要知道每个中断要增加多少信号量。
此外,每个增量都可能是一个内核调用,因此每个中断都有很多内核调用。
另一种方法是了解 pthread_cond_broadcast() 的工作原理。我在下面举了一个例子:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#ifndef NTHREAD
#define NTHREAD 5
#endif
pthread_mutex_t Lock;
pthread_cond_t CV;
int GlobalCount;
int Done;
#define X(y) do { if (y == -1) abort(); } while (0)
void *handler(void *x) {
unsigned icount;
X(pthread_mutex_lock(&Lock));
icount = 0;
while (!Done) {
if (icount < GlobalCount) {
X(pthread_mutex_unlock(&Lock));
icount++;
X(pthread_mutex_lock(&Lock));
} else {
X(pthread_cond_wait(&CV, &Lock));
}
}
X(pthread_mutex_unlock(&Lock));
return NULL;
}
int
main()
{
X(pthread_mutex_init(&Lock, NULL));
X(pthread_cond_init(&CV, NULL));
pthread_t id[NTHREAD];
int i;
for (i = 0; i < NTHREAD; i++) {
X(pthread_create(id+i, NULL, handler, NULL));
}
int c;
while ((c = getchar()) != EOF) {
X(pthread_mutex_lock(&Lock));
GlobalCount++;
X(pthread_mutex_unlock(&Lock));
X(pthread_cond_broadcast(&CV));
}
X(pthread_mutex_lock(&Lock));
Done = 1;
X(pthread_cond_broadcast(&CV));
X(pthread_mutex_unlock(&Lock));
for (i = 0; i < NTHREAD; i++) {
X(pthread_join(id[i], NULL));
}
return 0;
}
我正在使用 Pthread
在 Linux 中使用多线程。
Thread1
通过轮询字符设备文件等待来自驱动程序的 IRQ(我的驱动程序具有 ISR 以捕获来自硬件的 IRQ)。
IRQ -----> Thread1 |-----> Thread2
|-----> Thread3
|-----> Thread4
每当 Thread1
收到 IRQ 时,我想向 Thread2
、Thread3
和 Thread4
发送信号以唤醒它们然后工作。
现在,我正在尝试使用 "pthread conditional variable" 和 "pthread mutex"。不过好像不是什么好办法。
在这种情况下,什么是有效的同步方式?请帮忙。
非常感谢。
据我了解,您的问题是您的子线程(线程 2 到 4)并不总是为 Thread1 收到的每个 IRQ 恰好唤醒一次——特别是,可能是收到了一个 IRQ而子线程已经唤醒并处理较早的 IRQ,这导致它们不会被新的 IRQ 唤醒。
如果这是正确的,那么我认为一个简单的解决方案是为每个子线程使用 counting semaphore,而不是条件变量。信号量是一种简单的数据结构,它维护一个整数计数器,并提供两个操作,wait/P 和 signal/V。 wait/P 递减计数器,如果计数器的新值为负,它将阻塞直到计数器再次变为非负。 signal/V 递增计数器,如果计数器在递增之前为负,则唤醒一个等待线程(如果一个线程在 wait/P 中被阻塞)。
这样做的效果是,在您的主线程快速连续获得多个 IRQ 的情况下,信号量将 "remember" 多次 signal/V 调用(作为计数器的正整数值),并允许工作线程在未来多次调用 wait/P 而不会阻塞。这样就没有信号 "forgotten".
Linux 提供一个信号量 API(通过 sem_init() 等),但它是为进程间同步而设计的,因此对于同步进程内的线程来说有点重量级一个单一的过程。幸运的是,使用 pthreads 互斥锁和条件变量很容易实现您自己的信号量,如下所示。
请注意,在这个玩具示例中,main() 线程扮演 Thread1 的角色,每次您在终端 return 中按 return 时,它都会假装收到 IRQ .子线程扮演 Threads2-4 的角色,每次 Thread1 向它们发出信号时,它们都会假装做一秒钟的 "work"。特别要注意,如果您快速连续多次按下 return,子线程将始终执行那么多 "work units",即使它们每秒只能执行一个工作单元。
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
struct example_semaphore
{
pthread_cond_t cond;
pthread_mutex_t mutex;
int count; // acccess to this is serialized by locking (mutex)
};
// Initializes the example_semaphore (to be called at startup)
void Init_example_semaphore(struct example_semaphore * s)
{
s->count = 0;
pthread_mutex_init(&s->mutex, NULL);
pthread_cond_init(&s->cond, NULL);
}
// V: Increments the example_semaphore's count by 1. If the pre-increment
// value was negative, wakes a process that was waiting on the
// example_semaphore
void Signal_example_semaphore(struct example_semaphore * s)
{
pthread_mutex_lock(&s->mutex);
if (s->count++ < 0) pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->mutex);
}
// P: Decrements the example_semaphore's count by 1. If the new value of the
// example_semaphore is negative, blocks the caller until another thread calls
// Signal_example_semaphore()
void Wait_example_semaphore(struct example_semaphore * s)
{
pthread_mutex_lock(&s->mutex);
while(--s->count < 0)
{
pthread_cond_wait(&s->cond, &s->mutex);
if (s->count >= 0) break;
}
pthread_mutex_unlock(&s->mutex);
}
// This is the function that the worker-threads run
void * WorkerThreadFunc(void * arg)
{
int workUnit = 0;
struct example_semaphore * my_semaphore = (struct example_semaphore *) arg;
while(1)
{
Wait_example_semaphore(my_semaphore); // wait here until it's time to work
printf("Thread %p: just woke up and is working on work-unit #%i...\n", my_semaphore, workUnit++);
sleep(1); // actual work would happen here in a real program
}
}
static const int NUM_THREADS = 3;
int main(int argc, char ** argv)
{
struct example_semaphore semaphores[NUM_THREADS];
pthread_t worker_threads[NUM_THREADS];
// Setup semaphores and spawn worker threads
int i = 0;
for (i=0; i<NUM_THREADS; i++)
{
Init_example_semaphore(&semaphores[i]);
pthread_create(&worker_threads[i], NULL, WorkerThreadFunc, &semaphores[i]);
}
// Now we'll pretend to be receiving IRQs. We'll pretent to
// get one IRQ each time you press return.
while(1)
{
char buf[128];
fgets(buf, sizeof(buf), stdin);
printf("Main thread got IRQ, signalling child threads now!\n");
for (i=0; i<NUM_THREADS; i++) Signal_example_semaphore(&semaphores[i]);
}
}
我喜欢 jeremy 的回答,但它确实有一些不足,因为中断调度程序需要知道每个中断要增加多少信号量。 此外,每个增量都可能是一个内核调用,因此每个中断都有很多内核调用。
另一种方法是了解 pthread_cond_broadcast() 的工作原理。我在下面举了一个例子:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#ifndef NTHREAD
#define NTHREAD 5
#endif
pthread_mutex_t Lock;
pthread_cond_t CV;
int GlobalCount;
int Done;
#define X(y) do { if (y == -1) abort(); } while (0)
void *handler(void *x) {
unsigned icount;
X(pthread_mutex_lock(&Lock));
icount = 0;
while (!Done) {
if (icount < GlobalCount) {
X(pthread_mutex_unlock(&Lock));
icount++;
X(pthread_mutex_lock(&Lock));
} else {
X(pthread_cond_wait(&CV, &Lock));
}
}
X(pthread_mutex_unlock(&Lock));
return NULL;
}
int
main()
{
X(pthread_mutex_init(&Lock, NULL));
X(pthread_cond_init(&CV, NULL));
pthread_t id[NTHREAD];
int i;
for (i = 0; i < NTHREAD; i++) {
X(pthread_create(id+i, NULL, handler, NULL));
}
int c;
while ((c = getchar()) != EOF) {
X(pthread_mutex_lock(&Lock));
GlobalCount++;
X(pthread_mutex_unlock(&Lock));
X(pthread_cond_broadcast(&CV));
}
X(pthread_mutex_lock(&Lock));
Done = 1;
X(pthread_cond_broadcast(&CV));
X(pthread_mutex_unlock(&Lock));
for (i = 0; i < NTHREAD; i++) {
X(pthread_join(id[i], NULL));
}
return 0;
}