释放多个锁而不引起优先级倒置

Releasing multiple locks without causing priority inversion

简短版:如何从单个线程中释放多个锁,而不会在中途被抢占?

我有一个程序是为 运行 在 N 核机器上设计的。它由一个主线程和N个工作线程组成。每个线程(包括主线程)都有一个可以阻塞的信号量。通常,每个工作线程都在递减其信号量时被阻塞,而主线程是 运行ning。不过,主线程时不时地应该唤醒工作线程在一定时间内做他们的事情,然后阻塞在它自己的信号量上等待它们全部返回睡眠状态。像这样:

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_semaphore = semaphore(0)

    while True:
        ...do some work...
        workers_to_wake = foo()
        for i in workers_to_wake:
            worker_semaphore[i].increment() # wake up worker n
        for i in workers_to_wake:
            main_semaphore.decrement() # wait for all workers

def worker_thread(i):
    while True:
        worker_semaphore(i).decrement() # wait to be woken
        ...do some work...
        main_semaphore.increment() # report done with step

一切都很好。问题是,其中一个被唤醒的 worker 可能最终会在唤醒 worker 的过程中抢占主线程:例如,当 Windows 调度程序决定提高该 worker 的优先级时,可能会发生这种情况。这不会导致死锁,但效率很低,因为在抢占工作线程完成其工作之前,其余线程一直处于休眠状态。它基本上是优先级倒置,主线程等待其中一个工作线程,而一些工作线程等待主线程。

我可能会想出 OS- 和调度程序特定的技巧,例如在 Windows 下禁用优先级提升,以及摆弄线程优先级和处理器亲和性,但我会就像跨平台的、健壮和干净的东西。所以:我怎样才能原子地唤醒一堆线程?

当唤醒算法复杂度为O(n) 时,无法使用多个同步对象(信号量)。不过解决方法很少。

一次全部释放

我不确定 Python 是否有必要的方法(你的问题是 Python 特定的吗?),但通常,信号量的操作带有参数指定 decrements/increments.因此,您只需将所有线程放在同一个信号量上并一起唤醒它们。类似的做法是使用条件变量和notify all.

事件循环

如果您仍然希望能够单独控制每个线程,但喜欢一对多通知的方法,请尝试异步库 I/O,例如 libuv(和 its Python counterpart).在这里,您可以创建一次唤醒所有线程的单个事件,还可以为每个线程创建其单独的事件,然后只需在每个线程的事件循环中等待两个(或更多)事件对象。 另一个库是 pevents,它在 pthreads 的条件变量之上实现 WaitForMultipleObjects

代表醒来

另一种方法是将 O(n) 算法替换为树状算法 ( O(log n) ),其中每个线程仅唤醒固定数量的其他线程,但委托它们唤醒其他线程。在极端情况下,主线程只能唤醒一个其他线程,这将唤醒其他所有线程或启动连锁反应。如果您想以其他线程的唤醒延迟为代价来减少主线程的延迟,它会很有用。

Reader/Writer锁定

我通常在 POSIX 系统上用于一对多关系的解决方案是 reader/writer 锁。令我惊讶的是它们并不是一个完整的通用语言,但大多数语言要么实现一个版本,要么至少有一个包可用于在任何存在的原语上实现它们,例如 python 的 prwlock:

from prwlock import RWLock

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_lock = RWLock()

    while True:
        main_lock.acquire_write()
        ...do some work...   
        workers_to_wake = foo()
        # The above acquire could be moved as low as here,
        # depending on how independent the above processing is..            
        for i in workers_to_wake:
            worker_semaphore[i].increment() # wake up worker n

        main_lock.release()


def worker_thread(i):
    while True:
        worker_semaphore(i).decrement() # wait to be woken
        main_lock.acquire_read()
        ...do some work...
        main_lock.release() # report done with step

障碍

Barriers 似乎是 Python 最接近预期的内置机制来阻止所有线程,直到它们都被提醒,但是:

  1. 它们是一个非常不寻常的解决方案,因此它们会使您的 code/experience 更难翻译成其他语言。

  2. 我不想在这种情况下使用它们,因为要唤醒的线程数不断变化。鉴于你的 n 听起来很小,我很想使用常量 Barrier(n) 并通知所有线程检查它们是否是 运行 这个循环。但是:

  3. 我担心使用屏障会适得其反,因为任何被外部事物阻止的线程都会阻止它们,甚至具有资源依赖性提升的调度程序也可能会错过这种关系。需要所有 n 都到达障碍只会使情况变得更糟。

TL;博士

如果你真的需要从你的工人那里得到尽可能多的东西,只需使用一个事件信号量、一个控制块和一个屏障而不是你的信号量。但请注意,这是一个更脆弱的解决方案,因此您需要在任何潜在收益与这一缺点之间取得平衡。

上下文

首先,我需要总结一下我们讨论中更广泛的背景...

您有一个 Windows 图形应用程序。它具有所需的帧速率,因此您需要主线程以该速率 运行,以精确的时间间隔安排所有工作人员,以便他们在刷新间隔内完成工作。这意味着您对每个线程的开始和执行时间都有非常严格的限制。此外,您的工作线程并不完全相同,因此您不能只使用一个工作队列。

问题

与任何现代操作系统一样,Windows 具有多种 synchronization primitives。然而,其中 none 直接提供了一种同时通知多个原语的机制。查看其他操作系统,我看到了类似的模式;它们都提供了等待多个原语的方法,但是none提供了触发它们的原子方法。

那么我们可以做些什么呢?您需要解决的问题是:

  1. 精确计时所有必需工作人员的启动时间。
  2. 在下一帧中刺激实际需要 运行 的工人。

选项

问题 1 最明显的解决方案是只使用单个事件信号量,但您也可以使用 read/write 锁(通过在工作人员完成后获取写锁并让工作人员使用读锁)。所有其他选项不再是原子的,因此需要进一步同步以强制线程执行您想要的操作 - 就像 lossleader 对信号量内部锁的建议。

但是由于您的应用程序时间紧迫,我们想要一个尽可能减少上下文切换的最佳解决方案,所以让我们看看是否可以使用其中任何一个来解决问题 2...您如何选择如果我们只有一个事件信号量或 read/write 锁,哪个工作线程应该 运行 来自主线程?

嗯... read/write 锁是一种很好的方式,可以让一个线程将一些关键数据写入控制块,并让许多其他线程从中读取。为什么不只使用一个简单的布尔标志数组(一个用于每个工作线程)让您的主线程更新每一帧?可悲的是,您仍然需要停止执行工人,直到计时器弹出。简而言之,我们又回到了信号量和锁定解决方案。

但是,由于您的应用程序的性质,您可以多做一步。您可以相信这样一个事实,即您知道您的工作人员不会 运行 宁在您的时间片之外,而是使用事件信号量作为一种粗略的锁形式。

最后的优化(如果您的环境支持它们)是使用屏障而不是主信号量。你知道n个线程都需要空闲才能继续,所以就坚持吧。

一个解决方案

应用以上内容,您的伪代码将如下所示:

def main_thread(n):
    main_event = event()
    for i = 1 to n:
        worker_scheduled[i] = False
        spawn_thread(worker_thread, i)
    main_barrier = barrier(n+1)

    while True:
        ...do some work...
        workers_to_wake = foo()
        for i in workers_to_wake:
            worker_scheduled[i] = True
        main_event.set()
        main_barrier.enter() # wait for all workers
        main_event.reset()

def worker_thread(i):
    while True:
       main_event.wait()
       if worker_scheduled[i]:
            worker_scheduled[i] = False
            ...do some work...
       main_barrier.enter() # report finished for this frame.
       main_event.reset() # to catch the case that a worker is scheduled before the main thread

由于 worker_scheduled 阵列没有明确的监管,这是一个更加脆弱的解决方案。

因此,如果我不得不从我的 CPU 中挤出最后一盎司的处理,我个人只会使用它,但听起来这正是您正在寻找的。

Peter Brittain 的解决方案,加上 Anton 关于 "tree-like wakeup" 的建议,使我想到了另一个解决方案:链式唤醒。基本上,不是主线程做所有的唤醒,它只唤醒一个线程;然后每个线程负责唤醒下一个线程。这里优雅的一点是,只有一个挂起的线程准备好 运行,因此线程很少会以切换核心而告终。事实上,即使其中一个工作线程与主线程共享亲缘关系,这也适用于严格的处理器亲缘关系。

我做的另一件事是使用工作线程在休眠前递减的原子计数器;这样,只有最后一个唤醒主线程,所以也没有机会多次唤醒主线程来等待更多的信号量。

workers_to_wake = []
main_semaphore = semaphore(0)
num_woken_workers = atomic_integer()

def main_thread(n):
    for i = 1 to n:
        worker_semaphore[i] = semaphore(0)
        spawn_thread(worker_thread, i)
    main_semaphore = semaphore(0)

    while True:
        ...do some work...

        workers_to_wake = foo()
        num_woken_workers.atomic_set(len(workers_to_wake)) # set completion countdown
        one_to_wake = workers_to_wake.pop()
        worker_semaphore[one_to_wake].increment() # wake the first worker
        main_semaphore.decrement() # wait for all workers

def worker_thread(i):
    while True:
        worker_semaphore[i].decrement() # wait to be woken
        if workers_to_wake.len() > 0: # more pending wakeups
            one_to_wake = workers_to_wake.pop()
            worker_semaphore[one_to_wake].increment() # wake the next worker

        ...do some work...

        if num_woken_workers.atomic_decrement() == 0: # see whether we're the last one
            main_semaphore.increment() # report all done with step