修改的正确性 consumer/producer
Correctness of modified consumer/producer
我正在创建一个 Sound
class 来演奏音符,并希望就我的设计的正确性和简洁性提供反馈。这个 class 在两个方面不同于典型的 consumer/producer:
消费者应该响应事件,例如关闭线程,或者永远继续。当队列为空时,典型的 consumer/producer 退出。例如,在 queue.get
中等待的线程无法处理其他通知。
生产者提交的每组笔记都应覆盖队列中剩余的任何未处理笔记。
最初我使用 queue
模块让消费者一次处理一张便条。我发现在没有任何竞争的情况下不断获取和释放锁是低效的,并且如前所述,queue.get
会阻止等待其他事件。因此,我没有以此为基础,而是将其重写为:
import threading
queue = []
condition = threading.Condition()
interrupt = threading.Event()
stop = threading.Event()
def producer():
while some_condition:
ns = get_notes() # [(float,float)]
with condition:
queue.clear()
queue.append(ns)
interrupt.set()
condition.notify()
with condition:
stop.set()
condition.notify()
consumer.join()
def consumer():
while not stop.is_set():
with condition:
while not (queue or stop.is_set()):
condition.wait()
if stop.is_set():
break
interrupt.clear()
ns = queue.pop()
ss = gen_samples(ns) # iterator/fast
for b in grouper(ss, size/2):
if interrupt.is_set() or stop.is_set()
break
stream.write(b)
thread = threading.Thread(target=consumer)
thread.start()
producer()
我的问题如下:
这是线程安全的吗?我想特别指出我使用 is_set
没有锁或同步(在 for 循环中)。
事件可以用布尔变量代替吗?我相信,因为两个线程(数据竞争)中的冲突写入都受到 condition
变量的保护。设置和检查事件之间存在竞争条件,但我认为它不会影响程序流程。
是否有更高效的approach/algorithm利用来自threading
模块的不同同步原语?
edit:发现并修复了
中描述的可能死锁
分析Python中的线程安全可以考虑全局解释器锁(GIL):没有两个线程会同时执行Python代码。对变量或对象字段的分配实际上是原子的(没有分配一半的变量)并且更改会立即有效地传播到其他线程。
这意味着您对 Event.is_set()
的使用已经等同于 使用普通布尔值。事件是由条件保护的布尔值。 is_set()
方法直接检查布尔值。 set()
方法获取条件,设置布尔值,并通知所有等待线程。 wait()
方法等待 set()
方法被调用。 clear()
方法获取条件并取消设置布尔值。由于您从不 wait()
任何事件,并且设置布尔值是原子的,因此事件中的条件实际上未被使用。
这可能会摆脱几个锁,但并不是真正的巨大效率胜利。 Condition 仍然是对锁的抽象,但是内置的 Queue 类型直接使用锁。因此,我假设内置队列的性能不亚于您的解决方案,即使对于单个消费者也是如此。
内置队列的主要问题是“在没有任何竞争的情况下持续获取和释放锁 [is] 低效”。这在两个方面是错误的:
- 由于 Python 的 GIL,在这两种情况下几乎没有竞争。
- 获取无竞争锁非常有效。
因此,虽然您的解决方案可能足够正确(我看不到出现死锁的机会),但它不太可能特别有效。 (只有一些小错误,比如使用 stop
而不是 stop.is_set()
和一些语法错误。)
如果您发现 Python 线程性能不佳,这可能是因为 CPython,而不是因为队列类型。我已经提到,由于 GIL,一次只能有一个线程 运行 。如果多个线程想要 运行,它们必须由操作系统调度并获取 GIL。每个线程将等待 5ms,然后再要求 运行ning 线程放弃 GIL(以与您的中断标志非常相似的方式)。然后线程可以做一些有用的工作,比如为一个不能被其他线程中断的关键部分获取锁。
可能,解决方案是避免 CPython 的线程。
- 如果您有多个 CPU 绑定任务,则必须使用多个进程。 CPython 的线程不会运行 并行。但是,进程之间的通信成本更高。
- 考虑是否可以直接组合生产者+消费者,可能使用生成器等功能。
- 为了更轻松地在同一个线程中处理多个任务,请考虑使用 async/await。事件循环由 asyncio 模块提供。这与 Python 的线程一样快,但需要注意的是任务不会相互抢占(中断)。但这可能是一个优势:由于任务只能在
await
处暂停,您不需要大多数锁,并且更容易推断代码的正确性。缺点是 async/await 可能比使用线程有更高的延迟。
- Python 有一个“执行器”的概念,它使得 运行 任务在单独的线程(对于 I/O-bound 任务)或单独的进程(对于 CPU-绑定任务)。
- 要在多个进程之间进行通信,请使用
multiprocessing
模块中的类型(例如队列、连接或值)。
我正在创建一个 Sound
class 来演奏音符,并希望就我的设计的正确性和简洁性提供反馈。这个 class 在两个方面不同于典型的 consumer/producer:
消费者应该响应事件,例如关闭线程,或者永远继续。当队列为空时,典型的 consumer/producer 退出。例如,在
queue.get
中等待的线程无法处理其他通知。生产者提交的每组笔记都应覆盖队列中剩余的任何未处理笔记。
最初我使用 queue
模块让消费者一次处理一张便条。我发现在没有任何竞争的情况下不断获取和释放锁是低效的,并且如前所述,queue.get
会阻止等待其他事件。因此,我没有以此为基础,而是将其重写为:
import threading
queue = []
condition = threading.Condition()
interrupt = threading.Event()
stop = threading.Event()
def producer():
while some_condition:
ns = get_notes() # [(float,float)]
with condition:
queue.clear()
queue.append(ns)
interrupt.set()
condition.notify()
with condition:
stop.set()
condition.notify()
consumer.join()
def consumer():
while not stop.is_set():
with condition:
while not (queue or stop.is_set()):
condition.wait()
if stop.is_set():
break
interrupt.clear()
ns = queue.pop()
ss = gen_samples(ns) # iterator/fast
for b in grouper(ss, size/2):
if interrupt.is_set() or stop.is_set()
break
stream.write(b)
thread = threading.Thread(target=consumer)
thread.start()
producer()
我的问题如下:
这是线程安全的吗?我想特别指出我使用
is_set
没有锁或同步(在 for 循环中)。事件可以用布尔变量代替吗?我相信,因为两个线程(数据竞争)中的冲突写入都受到
condition
变量的保护。设置和检查事件之间存在竞争条件,但我认为它不会影响程序流程。是否有更高效的approach/algorithm利用来自
threading
模块的不同同步原语?
edit:发现并修复了
分析Python中的线程安全可以考虑全局解释器锁(GIL):没有两个线程会同时执行Python代码。对变量或对象字段的分配实际上是原子的(没有分配一半的变量)并且更改会立即有效地传播到其他线程。
这意味着您对 Event.is_set()
的使用已经等同于 使用普通布尔值。事件是由条件保护的布尔值。 is_set()
方法直接检查布尔值。 set()
方法获取条件,设置布尔值,并通知所有等待线程。 wait()
方法等待 set()
方法被调用。 clear()
方法获取条件并取消设置布尔值。由于您从不 wait()
任何事件,并且设置布尔值是原子的,因此事件中的条件实际上未被使用。
这可能会摆脱几个锁,但并不是真正的巨大效率胜利。 Condition 仍然是对锁的抽象,但是内置的 Queue 类型直接使用锁。因此,我假设内置队列的性能不亚于您的解决方案,即使对于单个消费者也是如此。
内置队列的主要问题是“在没有任何竞争的情况下持续获取和释放锁 [is] 低效”。这在两个方面是错误的:
- 由于 Python 的 GIL,在这两种情况下几乎没有竞争。
- 获取无竞争锁非常有效。
因此,虽然您的解决方案可能足够正确(我看不到出现死锁的机会),但它不太可能特别有效。 (只有一些小错误,比如使用 stop
而不是 stop.is_set()
和一些语法错误。)
如果您发现 Python 线程性能不佳,这可能是因为 CPython,而不是因为队列类型。我已经提到,由于 GIL,一次只能有一个线程 运行 。如果多个线程想要 运行,它们必须由操作系统调度并获取 GIL。每个线程将等待 5ms,然后再要求 运行ning 线程放弃 GIL(以与您的中断标志非常相似的方式)。然后线程可以做一些有用的工作,比如为一个不能被其他线程中断的关键部分获取锁。
可能,解决方案是避免 CPython 的线程。
- 如果您有多个 CPU 绑定任务,则必须使用多个进程。 CPython 的线程不会运行 并行。但是,进程之间的通信成本更高。
- 考虑是否可以直接组合生产者+消费者,可能使用生成器等功能。
- 为了更轻松地在同一个线程中处理多个任务,请考虑使用 async/await。事件循环由 asyncio 模块提供。这与 Python 的线程一样快,但需要注意的是任务不会相互抢占(中断)。但这可能是一个优势:由于任务只能在
await
处暂停,您不需要大多数锁,并且更容易推断代码的正确性。缺点是 async/await 可能比使用线程有更高的延迟。 - Python 有一个“执行器”的概念,它使得 运行 任务在单独的线程(对于 I/O-bound 任务)或单独的进程(对于 CPU-绑定任务)。
- 要在多个进程之间进行通信,请使用
multiprocessing
模块中的类型(例如队列、连接或值)。