修改的正确性 consumer/producer

Correctness of modified consumer/producer

我正在创建一个 Sound class 来演奏音符,并希望就我的设计的正确性和简洁性提供反馈。这个 class 在两个方面不同于典型的 consumer/producer:

  1. 消费者应该响应事件,例如关闭线程,或者永远继续。当队列为空时,典型的 consumer/producer 退出。例如,在 queue.get 中等待的线程无法处理其他通知。

  2. 生产者提交的每组笔记都应覆盖队列中剩余的任何未处理笔记。

最初我使用 queue 模块让消费者一次处理一张便条。我发现在没有任何竞争的情况下不断获取和释放锁是低效的,并且如前所述,queue.get 会阻止等待其他事件。因此,我没有以此为基础,而是将其重写为:

import threading


queue = []
condition = threading.Condition()
interrupt = threading.Event()
stop = threading.Event()

def producer():
    while some_condition:
        ns = get_notes() # [(float,float)]
        with condition:
            queue.clear()
            queue.append(ns)
            interrupt.set()
            condition.notify()
    with condition:
        stop.set()
        condition.notify()
    consumer.join()


def consumer():
    while not stop.is_set():
        with condition:
            while not (queue or stop.is_set()):
                condition.wait()
            if stop.is_set():
                break
            interrupt.clear()
            ns = queue.pop()
        ss = gen_samples(ns) # iterator/fast
        for b in grouper(ss, size/2):
            if interrupt.is_set() or stop.is_set()
                break
            stream.write(b)


thread = threading.Thread(target=consumer)
thread.start()
producer()

我的问题如下:

  1. 这是线程安全的吗?我想特别指出我使用 is_set 没有锁或同步(在 for 循环中)。

  2. 事件可以用布尔变量代替吗?我相信,因为两个线程(数据竞争)中的冲突写入都受到 condition 变量的保护。设置和检查事件之间存在竞争条件,但我认为它不会影响程序流程。

  3. 是否有更高效的approach/algorithm利用来自threading模块的不同同步原语?

edit:发现并修复了

中描述的可能死锁

分析Python中的线程安全可以考虑全局解释器锁(GIL):没有两个线程会同时执行Python代码。对变量或对象字段的分配实际上是原子的(没有分配一半的变量)并且更改会立即有效地传播到其他线程。

这意味着您对 Event.is_set() 的使用已经等同于 使用普通布尔值。事件是由条件保护的布尔值。 is_set() 方法直接检查布尔值。 set() 方法获取条件,设置布尔值,并通知所有等待线程。 wait() 方法等待 set() 方法被调用。 clear() 方法获取条件并取消设置布尔值。由于您从不 wait() 任何事件,并且设置布尔值是原子的,因此事件中的条件实际上未被使用。

这可能会摆脱几个锁,但并不是真正的巨大效率胜利。 Condition 仍然是对锁的抽象,但是内置的 Queue 类型直接使用锁。因此,我假设内置队列的性能不亚于您的解决方案,即使对于单个消费者也是如此。

内置队列的主要问题是“在没有任何竞争的情况下持续获取和释放锁 [is] 低效”。这在两个方面是错误的:

  1. 由于 Python 的 GIL,在这两种情况下几乎没有竞争。
  2. 获取无竞争锁非常有效。

因此,虽然您的解决方案可能足够正确(我看不到出现死锁的机会),但它不太可能特别有效。 (只有一些小错误,比如使用 stop 而不是 stop.is_set() 和一些语法错误。)

如果您发现 Python 线程性能不佳,这可能是因为 CPython,而不是因为队列类型。我已经提到,由于 GIL,一次只能有一个线程 运行 。如果多个线程想要 运行,它们必须由操作系统调度并获取 GIL。每个线程将等待 5ms,然后再要求 运行ning 线程放弃 GIL(以与您的中断标志非常相似的方式)。然后线程可以做一些有用的工作,比如为一个不能被其他线程中断的关键部分获取锁。

可能,解决方案是避免 CPython 的线程。

  • 如果您有多个 CPU 绑定任务,则必须使用多个进程。 CPython 的线程不会运行 并行。但是,进程之间的通信成本更高。
  • 考虑是否可以直接组合生产者+消费者,可能使用生成器等功能。
  • 为了更轻松地在同一个线程中处理多个任务,请考虑使用 async/await。事件循环由 asyncio 模块提供。这与 Python 的线程一样快,但需要注意的是任务不会相互抢占(中断)。但这可能是一个优势:由于任务只能在 await 处暂停,您不需要大多数锁,并且更容易推断代码的正确性。缺点是 async/await 可能比使用线程有更高的延迟。
  • Python 有一个“执行器”的概念,它使得 运行 任务在单独的线程(对于 I/O-bound 任务)或单独的进程(对于 CPU-绑定任务)。
  • 要在多个进程之间进行通信,请使用 multiprocessing 模块中的类型(例如队列、连接或值)。