了解 pthreads 锁和条件变量

Understanding pthreads locks and condition variables

我在 C 中进行了关于线程、锁和条件变量的练习。我需要编写一个获取数据的程序,将其转换为链表,启动 3 个线程,每个线程计算列表中每个节点的结果,并且主线程在 evreyone 完成后打印结果。

这是主要功能:

int thread_finished_count;

// Lock and Conditional variable
pthread_mutex_t list_lock;
pthread_mutex_t thread_lock;
pthread_cond_t thread_cv;

int main(int argc, char const *argv[])
{
    node *list;
    int pairs_count, status;
    thread_finished_count = 0;

    /* get the data and start the threads */
    node *head = create_numbers(argc, argv, &pairs_count);
    list = head; // backup head for results
    pthread_t *threads = start_threads(&list);

    /* wait for threads and destroy lock */
    status = pthread_cond_wait(&thread_cv, &list_lock);
    chcek_status(status);
    status = pthread_mutex_destroy(&list_lock);
    chcek_status(status);
    status = pthread_mutex_destroy(&thread_lock);
    chcek_status(status);

    /* print result in original list */
    print_results(head);

    /* cleanup */
    wait_for_threads(threads, NUM_THREADS);
    free_list(head);
    free(threads);

    return EXIT_SUCCESS;
}

请注意,create_numbers 功能正常工作,列表按预期工作。

这里是 start_thread 和 thread_function 代码:

pthread_t *start_threads(node **list)
{
    int status;
    pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * NUM_THREADS);
    check_malloc(threads);

    for (int i = 0; i < NUM_THREADS; i++)
    {
        status = pthread_create(&threads[i], NULL, thread_function, list);
        chcek_status(status);
    }
    return threads;
}

void *thread_function(node **list)
{
    int status, self_id = pthread_self();
    printf("im in %u\n", self_id);
    node *currentNode;

    while (1)
    {
        if (!(*list))
            break;
        status = pthread_mutex_lock(&list_lock);
        chcek_status(status);
        printf("list location %p thread %u\n", *list, self_id);
        if (!(*list))
        {
            status = pthread_mutex_unlock(&list_lock);
            chcek_status(status);
            break;
        }
        currentNode = (*list);
        (*list) = (*list)->next;
        status = pthread_mutex_unlock(&list_lock);
        chcek_status(status);
        currentNode->gcd = gcd(currentNode->num1, currentNode->num2);
        status = usleep(10);
        chcek_status(status);
    }
    status = pthread_mutex_lock(&thread_lock);
    chcek_status(status);
    thread_finished_count++;
    status = pthread_mutex_unlock(&thread_lock);
    chcek_status(status);
    if (thread_finished_count != 3)
        return NULL;
    status = pthread_cond_signal(&thread_cv);
    chcek_status(status);
    return NULL;
}
void chcek_status(int status)
{
    if (status != 0)
    {
        fputs("pthread_function() error\n", stderr);
        exit(EXIT_FAILURE);
    }
}

请注意 self_id 用于调试目的。

我的问题

  1. 我的主要问题是拆分工作。每个线程so从全局链表中取一个元素,计算gcd,然后继续取下一个元素。仅当我在 while 循环中解锁互斥锁后添加 usleep(10) 时,我才会获得此效果。如果我不添加 usleep,第一个线程将进入并完成所有工作,而其他线程将等待并在所有工作完成后进入。

请注意!:我考虑过可能创建第一个线程的选项,直到创建第二个线程,第一个线程已经完成所有作业。这就是为什么我在创建每个线程时使用 usleep(10) 添加“我在#threadID”检查。他们都进来了,但只有第一个在做所有的工作。 如果我在互斥锁解锁后执行 usleep,这是输出示例(注意不同的线程 ID)

with usleep

./v2 nums.txt
im in 1333593856
list location 0x7fffc4fb56a0 thread 1333593856
im in 1316685568
im in 1325139712
list location 0x7fffc4fb56c0 thread 1333593856
list location 0x7fffc4fb56e0 thread 1316685568
list location 0x7fffc4fb5700 thread 1325139712
list location 0x7fffc4fb5720 thread 1333593856
list location 0x7fffc4fb5740 thread 1316685568
list location 0x7fffc4fb5760 thread 1325139712
list location 0x7fffc4fb5780 thread 1333593856
list location 0x7fffc4fb57a0 thread 1316685568
list location 0x7fffc4fb57c0 thread 1325139712
list location 0x7fffc4fb57e0 thread 1333593856
list location 0x7fffc4fb5800 thread 1316685568
list location (nil) thread 1325139712
list location (nil) thread 1333593856
...
normal result output
...

这就是我在互斥锁定后注释掉 usleep 的输出(注意相同的线程 ID) 没有睡觉

  ./v2 nums.txt
im in 2631730944
list location 0x7fffe5b946a0 thread 2631730944
list location 0x7fffe5b946c0 thread 2631730944
list location 0x7fffe5b946e0 thread 2631730944
list location 0x7fffe5b94700 thread 2631730944
list location 0x7fffe5b94720 thread 2631730944
list location 0x7fffe5b94740 thread 2631730944
list location 0x7fffe5b94760 thread 2631730944
list location 0x7fffe5b94780 thread 2631730944
list location 0x7fffe5b947a0 thread 2631730944
list location 0x7fffe5b947c0 thread 2631730944
list location 0x7fffe5b947e0 thread 2631730944
list location 0x7fffe5b94800 thread 2631730944
im in 2623276800
im in 2614822656
...
normal result output
...
  1. 我的第二个问题是关于线程工作的顺序。我的练习要求我不要使用 join 来同步线程(仅在最后使用以“释放资源”),而是直接使用该条件变量。

我的目标是每个线程都将获取元素,进行计算,同时另一个线程将进入并获取另一个元素,新线程将获取每个元素(或至少接近那个)

感谢阅读,感谢您的帮助。

首先,您在持有锁的同时进行 gcd() 工作...所以 (a) 任何时候只有一个线程会做任何工作,尽管 (b) 这并不能完全解释为什么只有一个线程似乎完成(几乎)所有工作——正如 KamilCuk 所说,可能是因为没有什么工作要做,它(几乎)在第二个线程正确唤醒之前全部完成。 [更奇特的是,在线程 'a' 解锁互斥量和另一个线程开始 运行 之间可能存在一些延迟,这样线程 'a' 可以在另一个线程到达那里之前获取互斥量。]

POSIX 表示当解锁互斥体时,如果有服务员则 "the scheduling policy shall determine which thread shall acquire the mutex"。默认 "scheduling policy" 是(据我所知)定义的实现。

您可以尝试一些方法:(1) 使用 pthread_barrier_t 将所有线程保存在 thread_function() 的开头,直到它们所有运行宁; (2)pthread_mutex_unlock() 之后使用 sched_yield(void) 提示系统进入 运行 新的 运行 可用线程。

其次,您在任何情况下都不应将 'condition variable' 视为信号。要让 main() 知道所有线程都已完成,您需要一个计数——可以是 pthread_barrier_t;或者它可以是简单的整数,由互斥锁保护,并带有 'condition variable' 以在等待时保持主线程;或者它可以是一个计数(在 main() 中)和一个信号量(每个线程退出时发布一次)。

第三,你在main()中显示pthread_cond_wait(&cv, &lock);。到那时 main() 必须 拥有 lock... 而这很重要。但是:就目前而言,找到 list 空的 第一个 线程将启动 cv,并且 main() 将继续,即使其他线程正在还是运行宁。虽然一旦 main() 确实重新获取 lock,任何仍然处于 运行 状态的线程将退出或卡在 lock 上。 (一团糟。)


一般来说,使用'condition variable'的模板是:

    pthread_mutex_lock(&...lock) ;

    while (!(... thing we need ...))
      pthread_cond_wait(&...cond_var, &...lock) ;

    ... do stuff now we have what we need ....

    pthread_mutex_unlock(&...lock) ;

注意:'condition variable' 没有值...尽管名称如此,但它 不是 标志,表示某些条件为真。 'condition variable' 本质上是一个等待重新启动的线程队列。当 'condition variable' 发出信号时,至少会重新启动一个等待线程——但是如果没有线程在等待,什么都不会发生,特别是(所谓的) 'condition variable' 保留 没有记忆 信号。

在新代码中,按照上述模板,main() 应该:

    /* wait for threads .... */

    status = pthread_mutex_lock(&thread_lock);
    chcek_status(status);

    while (thread_finished_count != 3)
      {
        pthread_cond_wait(&thread_cv, &thread_lock) ;
        chcek_status(status);
      } ;

    status = pthread_mutex_unlock(&thread_lock) ;
    chcek_status(status);

那么这是怎么回事?

  1. main() 正在等待 thread_finished_count == 3

  2. thread_finished_countthread_lock 互斥锁的共享变量 "protected"。

    ...因此它在互斥锁下的 thread_function() 中递增。

    ...并且main() 也必须在互斥量下读取它。

  3. 如果main()找到thread_finished_count != 3它必须等待。

    做到这一点:pthread_cond_wait(&thread_cv, &thread_lock),其中:

    • 解锁thread_lock

    • 将线程置于等待线程的 thread_cv 队列中。

    原子地

  4. thread_function() 执行 pthread_cond_signal(&thread_cv) 时,它会唤醒等待线程。

  5. main()线程唤醒后,会先重新获取thread_lock...

    ...所以它可以继续重新读取 thread_finished_count,看看现在是否是 3

FWIW:我建议不要销毁互斥体等,直到所有线程都加入后。

我深入研究了 glibc(至少在 Linux 和 x86_64 上的 v2.30)如何实现 pthread_mutex_lock()_unlock()

原来 _lock() 是这样工作的:

  if (atomic_cmp_xchg(mutex->lock, 0, 1))
    return <OK> ;             // mutex->lock was 0, is now 1

  while (1)
    {
      if (atomic_xchg(mutex->lock, 2) == 0)
        return <OK> ;        // mutex->lock was 0, is now 2

      ...do FUTEX_WAIT(2)... // suspend thread iff mutex->lock == 2...
    } ;

_unlock() 的工作方式如下:

  if (atomic_xchg(mutex->lock, 0) == 2)  // set mutex->lock == 0
    ...do FUTEX_WAKE(1)...               // if may have waiter(s) start 1

现在:

  • mutex->lock:0 => 解锁,1 => 锁定但没有服务员,2 => 锁定服务员

    'locked-but-no-waiters'针对没有锁争用的情况进行了优化,不需要在_unlock().

  • 中做FUTEX_WAKE
  • _lock()/_unlock()函数在库中——它们不在内核中。

    ...特别是,互斥锁的所有权是库的事情,不是内核。

  • FUTEX_WAIT(2) 是对内核的调用,它会将线程放置在与互斥体关联的待处理队列中,除非 mutex->lock != 2.

    内核检查 mutex->lock == 2原子地 将线程添加到队列中。这处理在 atomic_xchg(mutex->lock, 2) 之后调用 _unlock() 的情况。

  • FUTEX_WAKE(1)也是对内核的调用,futex手册页告诉我们:

    FUTEX_WAKE (since Linux 2.6.0)

    This operation wakes at most 'val' of the waiters that are waiting ... No guarantee is provided about which waiters are awoken (e.g., a waiter with a higher scheduling priority is not guaranteed to be awoken in preference to a waiter with a lower priority).

    其中 'val' 在这种情况下为 1。

    虽然文档说 "no guarantee about which waiters are awoken",队列似乎至少是 FIFO。

特别注意:

  1. _unlock() 将互斥量传递给 FUTEX_WAKE.

    [=82 启动的线程=]
  2. 一旦被唤醒,线程会再次尝试获取锁...

    ...但可能会被任何其他 运行 线程击败 - 包括刚刚执行 _unlock().

  3. 的线程

我相信这就是您没有看到跨线程共享工作的原因。每个人要做的工作很少,一个线程可以解锁互斥锁,完成工作并返回以再次锁定互斥锁 before 被解锁唤醒的线程可以获得继续并成功锁定互斥量。