Pthreads-互斥锁但变量不改变

Pthreads- Mutex Locks but variables don't change

我正在编写一个非常简单的程序来演示我从 C++ 移植回 C 的 Pthreads 实现。

我创建了两个锁步线程并给它们两个作业

每一步递增a1一次

one每步递减a2一次

在同步阶段(当 t1 和 t2 的互斥量都被锁定时)我比较 a1 和 a2 看我们是否应该停止步进。

我想知道我在这里是不是疯了,因为不仅变量 在步进和锁定后并不总是改变 ,而且它们有时 在不同的地方改变rates 就好像线程是 运行 即使在锁定之后。

编辑:是的,我对此进行了研究。是的,C++ 实现有效。是的,C++ 实现与这个几乎相同,但我必须在 c 中强制转换 PTHREAD_MUTEX_INITIALIZER 和 PTHREAD_COND_INITIALIZER 并将其作为第一个参数传递给每个函数。我花了一段时间尝试调试它(没有推出 gdb)但无济于事。

#ifndef LOCKSTEPTHREAD_H
#define LOCKSTEPTHREAD_H
#include <pthread.h>
#include <stdio.h>
typedef struct {
    pthread_mutex_t myMutex;
    pthread_cond_t myCond;
    pthread_t myThread;
    int isThreadLive;
    int shouldKillThread;
    void (*execute)();
} lsthread;
void init_lsthread(lsthread* t);
void start_lsthread(lsthread* t);
void kill_lsthread(lsthread* t);
void kill_lsthread_islocked(lsthread* t);
void lock(lsthread* t);
void step(lsthread* t);
void* lsthread_func(void* me_void);
#ifdef LOCKSTEPTHREAD_IMPL
//function declarations

void init_lsthread(lsthread* t){
    //pthread_mutex_init(&(t->myMutex), NULL);
    //pthread_cond_init(&(t->myCond), NULL);
    t->myMutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
    t->myCond = (pthread_cond_t)PTHREAD_COND_INITIALIZER;
    t->isThreadLive = 0;
    t->shouldKillThread = 0;
    t->execute = NULL;
}
void destroy_lsthread(lsthread* t){
    pthread_mutex_destroy(&t->myMutex);
    pthread_cond_destroy(&t->myCond);
}
void kill_lsthread_islocked(lsthread* t){
    if(!t->isThreadLive)return;
    //lock(t);
    t->shouldKillThread = 1;
    step(t);
    pthread_join(t->myThread,NULL);
    t->isThreadLive = 0;
    t->shouldKillThread = 0;
}

void kill_lsthread(lsthread* t){
    if(!t->isThreadLive)return;
    lock(t);
    t->shouldKillThread = 1;
    step(t);
    pthread_join(t->myThread,NULL);
    t->isThreadLive = 0;
    t->shouldKillThread = 0;
}
void lock(lsthread* t){
    if(pthread_mutex_lock(&t->myMutex))
        puts("\nError locking mutex.");
}

void step(lsthread* t){
    if(pthread_cond_signal(&(t->myCond)))
        puts("\nError signalling condition variable");
    if(pthread_mutex_unlock(&(t->myMutex)))
        puts("\nError unlocking mutex");
}
void* lsthread_func(void* me_void){
    lsthread* me = (lsthread*) me_void;
    int ret;
    if (!me)pthread_exit(NULL);
    if(!me->execute)pthread_exit(NULL);
    while (!(me->shouldKillThread)) {
        ret = pthread_cond_wait(&(me->myCond), &(me->myMutex));
        if(ret)pthread_exit(NULL);
        if (!(me->shouldKillThread) && me->execute)
            me->execute();
    }
    pthread_exit(NULL);
}
void start_lsthread(lsthread* t){
    if(t->isThreadLive)return;
    t->isThreadLive = 1;
    t->shouldKillThread = 0;
    pthread_create(
        &t->myThread,
        NULL,
        lsthread_func,
        (void*)t
    );
}
#endif
#endif

这是我的驱动程序:

#define LOCKSTEPTHREAD_IMPL
#include "include/lockstepthread.h"
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
unsigned char a1, a2;
void JobThread1(){
    unsigned char copy = a1;
    copy++;
    a1 = copy;
}
void JobThread2(){
    unsigned char copy = a2;
    copy--;
    a2 = copy;
}
int main(){
    char inputline[2048];
    inputline[2047] = '[=11=]';
    lsthread t1, t2;
    init_lsthread(&t1);
    init_lsthread(&t2);
    t1.execute = JobThread1;
    t2.execute = JobThread2;
    printf(
    "\nThis program demonstrates threading by having"
    "\nTwo threads \"walk\" toward each other using unsigned chars."
    "\nunsigned Integer overflow guarantees the two will converge."
    );
    printf("\nEnter a number for thread 1 to process: ");
    fgets(inputline, 2047,stdin);
    a1 = (unsigned char)atoi(inputline);
    printf("\nEnter a number for thread 2 to process: ");
    fgets(inputline, 2047,stdin);
    a2 = (unsigned char)atoi(inputline);
    start_lsthread(&t1);
    start_lsthread(&t2);
    unsigned int i = 0;
    lock(&t1);
    lock(&t2);
    do{
        printf("\n%u: a1 = %d, a2 = %d",i++,(int)a1,(int)a2);
        fflush(stdout);
        step(&t1);
        step(&t2);
        lock(&t1);
        lock(&t2);
    }while(a1 < a2);
    kill_lsthread_islocked(&t1);
    kill_lsthread_islocked(&t2);
    destroy_lsthread(&t1);
    destroy_lsthread(&t2);
    return 0;
}

示例程序用法:

Enter a number for thread 1 to process: 5

Enter a number for thread 2 to process: 10

0: a1 = 5, a2 = 10
1: a1 = 5, a2 = 10
2: a1 = 5, a2 = 10
3: a1 = 5, a2 = 10
4: a1 = 5, a2 = 10
5: a1 = 5, a2 = 10
6: a1 = 6, a2 = 9
7: a1 = 6, a2 = 9
8: a1 = 7, a2 = 9
9: a1 = 7, a2 = 9
10: a1 = 7, a2 = 9
11: a1 = 7, a2 = 9
12: a1 = 8, a2 = 9

那么,怎么回事?

一般来说,听起来您真正要找的是一个障碍。尽管如此,我还是照原样回答问题。

Yes, the C++ implementation works. Yes, the C++ implementation is nearly identical to this one, but I had to cast PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER in c and pass this as the first argument to every function. I spent a while trying to debug this (short of whipping out gdb) to no avail.

这似乎不太可能。所提供的代码中到处都有数据竞争和未定义的行为,无论是解释为 C 还是 C++。

总体设计

既然您提供了一个显式的 lock() 函数,这似乎是合理的,那么您也应该提供一个显式的 unlock() 函数。任何其他希望在锁定互斥锁的情况下调用的函数都应该 return 锁定互斥锁,以便调用者可以显式地将 lock() 调用与 unlock() 调用配对。不遵守此模式会引发错误。

特别是,step() 不应该解锁互斥锁,除非它也锁定了它,但我认为 non-locking 版本将适合这个目的。

初始化

I had to cast PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER in c

不,你没有,因为你不能,至少如果pthread_mutex_tpthread_cond_t 是结构类型。结构类型的初始值设定项不是值。它们没有类型并且不能被强制转换。但是您 可以 从它们中形成复合文字,这就是您无意中所做的。 这不是为 pthread_mutex_tpthread_cond_t 赋值的合规方式。* 初始化宏是指定仅用于在其声明中初始化变量。这就是“初始化程序”在这种情况下的意思。

示例:

pthread_mutex_t mutex = PTREAD_MUTEX_INITIALIZER;

示例:

struct {
    pthread_mutex_t mutex;
    pthread_cond_t  cv;
} example = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

要在任何其他上下文中初始化互斥锁或条件变量对象,需要使用相应的初始化函数,pthread_mutex_init()pthread_cond_init()

数据竞赛

Non-atomic 多个 concurrently-running 线程对共享数据的访问必须受到互斥锁或其他同步对象的保护,如果任何访问是写入的话(例外情况适用于对互斥锁和其他同步对象的访问他们自己)。示例中的共享数据包括 file-scope 变量 a1a2,以及 lsthread 实例的大多数成员。您的 lsthread_func 和驱动程序在访问这些共享数据之前有时都无法锁定适当的互斥锁,并且涉及的某些访问确实是写入,因此会发生未定义的行为。观察到 a1a2 的意外值是该未定义行为的完全合理的表现。

条件变量用法

调用 pthread_cond_wait() 的线程必须在锁定指定的互斥锁的同时执行此操作。您的 lsthread_func() 不符合该要求,因此会出现更多未定义的行为。如果你很幸运,那可能表现为立即的虚假唤醒。

说到虚假唤醒,您没有防范它们。如果确实发生了,那么 lsthread_func() 会愉快地继续执行其循环的另一次迭代。为避免这种情况,您需要在某处共享数据,条件变量的 condition 就基于该共享数据。 CV 的标准用法是在等待之前检查谓词,并在醒来后循环并再次检查它,必要时重复检查,直到谓词评估为真才继续。

同步步进

工作线程之间不直接同步,因此只有驱动程序才能确保一个不运行领先于另一个。 但它没有。 驱动程序根本不做任何事情来确保任一线程已完成一个步骤,然后再向两个线程发出信号以执行另一个步骤。条件变量不存储信号,因此如果由于某种不幸的调度或所涉及任务的性质,一个线程应该比另一个线程领先一步,它将保持领先,直到并且除非错误恰好被自发平衡另一边失误。

可能您想添加一个 lsthread_wait() 函数来等待线程完成一个步骤。这将涉及从相反方向使用 CV。

总体而言,您可以通过

为 single-stepping 提供(更好)
  • 将成员添加到类型 lsthread 以指示线程是否应该或正在执行步骤 vs。是否在步骤之间,应该等待。

    typedef struct {
        // ...
        _Bool should_step;
    } lsthread;
    
  • 加上前面提到的lsthread_wait(),可能是这样的:

    // The calling thread must hold t->myMutex locked
    void lsthread_wait(lsthread *t) {
        // Wait, if necessary, for the thread to complete a step
        while (t->should_step) {
            pthread_cond_wait(&t->myCond, &t->myMutex);
        }
        assert(!t->should_step);
    
        // Prepare to perform another step
        t->should_step = 1;
    }
    
  • 这将与 lsthread_func() 的修订版本配对:

     void* lsthread_func(void* me_void){
         lsthread* me = (lsthread*) me_void;
         if (!me) pthread_exit(NULL);
    
         lock(me); // needed to protect access to *me members and to globals
         while (!me->shouldKillThread && me->execute) {
             while (!me->should_step && !me->shouldKillThread) {
                 int ret = pthread_cond_wait(&(me->myCond), &(me->myMutex));
                 if (ret) {
                     unlock(me);  // mustn't forget to unlock
                     pthread_exit(NULL);
                 }
             }
             assert(me->should_step || me->shouldKillThread);
    
             if (!me->shouldKillThread && me->execute) {
                 me->execute();
             }
    
             // Mark and signal step completed
             me->should_step = 0;
             ret = pthread_cond_broadcast(me->myCond);
             if (ret) break;
         }
         unlock(me);
    
         pthread_exit(NULL);
     }
    
  • 修改 step() 以避免解锁互斥体。

  • 修改驱动程序循环以适当使用新的等待函数

    lock(&t1);
    lock(&t2);
    do {
        printf("\n%u: a1 = %d, a2 = %d", i++, (int) a1, (int) a2);
        fflush(stdout);
        step(&t1);
        step(&t2);
        lsthread_wait(&t1);
        lsthread_wait(&t2);
    } while (a1 < a2);  // both locks must be held when this condition is evaluated
    kill_lsthread_islocked(&t2);
    kill_lsthread_islocked(&t1);
    unlock(&t2);
    unlock(&t1);
    

这不一定是所有需要的更改,但我认为我已经涵盖了所有关键点。

最后的笔记

以上建议基于示例程序,其中不同的工作线程不会访问任何相同的共享数据。这使得使用 per-thread 互斥来保护共享数据变得可行塞斯。如果工作人员访问任何相同的共享数据,并且那些或任何线程 运行 与他们同时修改了相同的数据,那么 per-worker-thread 互斥体将无法提供足够的保护。


* 如果 pthread_mutex_tpthread_cond_t 是指针或整数类型,这是允许的,那么编译器会在没有一个演员表(在那种情况下实际上 一个演员表),但就 pthreads 而言,这些分配仍然是 non-conforming。