附加到同一个锁的两个条件变量,python2 和 python3 中的不同行为
two condition variables attached to the same lock, different behaviors in python2 and python3
我正在尝试在 python 中编写经典的生产者-消费者程序。
这是我引用的 c 代码:
http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture16.html
https://web.stanford.edu/~ouster/cgi-bin/cs140-spring14/lecture.php?topic=locks
在 pip install colored
和 pip3 install colored
之后
我 运行 这个程序在 lubuntu 18.04 上。
当运行宁为"python3 producer-consumer.py"
(即 运行 python 3.6.7)
该程序在
的几次迭代后挂起
"queue is empty, stop consuming"
或在
"queue is full, stop producing"
注意:ctrl-c 不会终止程序。
你需要按 ctrl-z 然后 kill -9 %1 来杀死它。
奇怪的是:当运行ning as "python producer-consumer.py"
(即 运行 python 2.7.15rc1)它几乎符合预期 运行s。
但是在 运行ning 足够长的时间后,它会在
处引发 IndexError 异常
queue.append(item)
或在
item = queue.pop(0)
在此之前,运行按预期持续了好几分钟:
3个生产者和3个各种颜色的消费者
在同一个小容量队列上工作,
经常碰到空队列和满队列的情况。
我怀疑不管我的程序是否正确,
python2 和 python3 中的不同行为似乎表明
python3 中有一个错误(也许 python2 中也有)
条件变量的实现?
或者对于某些有缺陷的程序,这种差异实际上是预期的吗?
提前致谢。
from threading import Thread, Lock, Condition
import time
from random import random, randint
import colored
from colored import stylize
queue = []
CAPACITY = 3
qlock = Lock()
item_ok = Condition(qlock)
space_ok = Condition(qlock)
class ProducerThread(Thread):
def run(self):
global queue
mycolor = self.name
while True:
qlock.acquire()
if len(queue) >= CAPACITY:
print(stylize('queue is full, stop producing', colored.fg(mycolor)))
while space_ok.wait():
pass
print(stylize('space available again, start producing', colored.fg(mycolor)))
item = chr(ord('A')+randint(0,25))
print(stylize('['+' '.join(queue)+'] <= '+item, colored.fg( mycolor)))
queue.append(item)
item_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
class ConsumerThread(Thread):
def run(self):
global queue
mycolor = self.name
while True:
qlock.acquire()
if not queue:
print(stylize('queue is empty, stop consuming', colored.fg(mycolor)))
while item_ok.wait():
pass
print(stylize('food is available, start consuming', colored.fg(mycolor)))
item = queue.pop(0)
print(stylize(item+' <= ['+' '.join(queue)+']', colored.fg( mycolor)))
space_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
ProducerThread(name='red').start()
ProducerThread(name='green').start()
ProducerThread(name='blue').start()
ConsumerThread(name='cyan').start()
ConsumerThread(name='magenta').start()
ConsumerThread(name='yellow').start()
主要问题是您的代码是在 通知线程后不检查列表是否为空/完整。在以下情况下可能会出现问题:
c1
和c2
是消费者线程,p1
是生产者线程。队列一开始是空的。 c1
处于唤醒状态(当前在最后一行 time.sleep...
),而 c2
正在等待通知(在行 while item_ok.wait():
.
p1
向队列添加一个项目并调用 item_ok.notify()
c1
完成等待并获取锁
c2
收到通知并尝试获取锁
c1
消费队列中的项并释放锁
c2
获取锁并尝试从空队列中弹出
解决方案
而不是在 while 条件下调用 .wait()
(这是无意义的,因为它总是 returns None
on Python 2 并且总是 True
在 Python 3.2+ 上,参见 here),在 while 循环体中调用 .wait()
并将队列是否未满/空的条件放入 while 循环条件中:
while not queue:
print('queue is empty, stop consuming')
item_ok.wait()
print('trying again')
通过使用这种方法(也用于上面链接的文档中),线程在被唤醒并获得锁后检查队列是否仍然不为空/满。如果条件不再满足(因为另一个线程在其间执行),线程将再次等待条件。
顺便说一下,上面描述的python 2 和3 之间的区别也是您的程序在两个版本上表现不同的原因。这是记录在案的行为,而不是实施中的错误。
生产者和消费者线程的固定代码(在过去 30 分钟内 运行 在我的机器上运行良好)看起来像这样(我删除了颜色,因为我不想安装包裹):
class ProducerThread(Thread):
def run(self):
global queue
while True:
qlock.acquire()
while len(queue) >= CAPACITY:
print('queue is full, stop producing')
space_ok.wait()
print('trying again')
item = chr(ord('A')+randint(0,25))
print('['+' '.join(queue)+'] <= '+item)
queue.append(item)
item_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
class ConsumerThread(Thread):
def run(self):
global queue
while True:
qlock.acquire()
while not queue:
print('queue is empty, stop consuming')
item_ok.wait()
print('trying again')
item = queue.pop(0)
print(item+' <= ['+' '.join(queue)+']')
space_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
奖金
您提到无法使用 Ctrl-C
(KeyboardInterrupt) 退出程序。要解决此问题,您可以使线程 "daemons" 这意味着它们会在主线程结束后立即退出。使用上面的代码,Ctrl-C
可以很好地结束程序:
ProducerThread(name='red', daemon=True).start()
ProducerThread(name='green', daemon=True).start()
ProducerThread(name='blue', daemon=True).start()
ConsumerThread(name='cyan', daemon=True).start()
ConsumerThread(name='magenta', daemon=True).start()
ConsumerThread(name='yellow', daemon=True).start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Exiting")
这是否解决了您的问题?请在下方留言。
我正在尝试在 python 中编写经典的生产者-消费者程序。 这是我引用的 c 代码: http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture16.html https://web.stanford.edu/~ouster/cgi-bin/cs140-spring14/lecture.php?topic=locks
在 pip install colored
和 pip3 install colored
之后
我 运行 这个程序在 lubuntu 18.04 上。
当运行宁为"python3 producer-consumer.py"
(即 运行 python 3.6.7)
该程序在
"queue is empty, stop consuming"
或在
"queue is full, stop producing"
注意:ctrl-c 不会终止程序。 你需要按 ctrl-z 然后 kill -9 %1 来杀死它。
奇怪的是:当运行ning as "python producer-consumer.py" (即 运行 python 2.7.15rc1)它几乎符合预期 运行s。 但是在 运行ning 足够长的时间后,它会在
处引发 IndexError 异常queue.append(item)
或在
item = queue.pop(0)
在此之前,运行按预期持续了好几分钟: 3个生产者和3个各种颜色的消费者 在同一个小容量队列上工作, 经常碰到空队列和满队列的情况。
我怀疑不管我的程序是否正确, python2 和 python3 中的不同行为似乎表明 python3 中有一个错误(也许 python2 中也有) 条件变量的实现? 或者对于某些有缺陷的程序,这种差异实际上是预期的吗? 提前致谢。
from threading import Thread, Lock, Condition
import time
from random import random, randint
import colored
from colored import stylize
queue = []
CAPACITY = 3
qlock = Lock()
item_ok = Condition(qlock)
space_ok = Condition(qlock)
class ProducerThread(Thread):
def run(self):
global queue
mycolor = self.name
while True:
qlock.acquire()
if len(queue) >= CAPACITY:
print(stylize('queue is full, stop producing', colored.fg(mycolor)))
while space_ok.wait():
pass
print(stylize('space available again, start producing', colored.fg(mycolor)))
item = chr(ord('A')+randint(0,25))
print(stylize('['+' '.join(queue)+'] <= '+item, colored.fg( mycolor)))
queue.append(item)
item_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
class ConsumerThread(Thread):
def run(self):
global queue
mycolor = self.name
while True:
qlock.acquire()
if not queue:
print(stylize('queue is empty, stop consuming', colored.fg(mycolor)))
while item_ok.wait():
pass
print(stylize('food is available, start consuming', colored.fg(mycolor)))
item = queue.pop(0)
print(stylize(item+' <= ['+' '.join(queue)+']', colored.fg( mycolor)))
space_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
ProducerThread(name='red').start()
ProducerThread(name='green').start()
ProducerThread(name='blue').start()
ConsumerThread(name='cyan').start()
ConsumerThread(name='magenta').start()
ConsumerThread(name='yellow').start()
主要问题是您的代码是在 通知线程后不检查列表是否为空/完整。在以下情况下可能会出现问题:
c1
和c2
是消费者线程,p1
是生产者线程。队列一开始是空的。 c1
处于唤醒状态(当前在最后一行 time.sleep...
),而 c2
正在等待通知(在行 while item_ok.wait():
.
p1
向队列添加一个项目并调用item_ok.notify()
c1
完成等待并获取锁c2
收到通知并尝试获取锁c1
消费队列中的项并释放锁c2
获取锁并尝试从空队列中弹出
解决方案
而不是在 while 条件下调用 .wait()
(这是无意义的,因为它总是 returns None
on Python 2 并且总是 True
在 Python 3.2+ 上,参见 here),在 while 循环体中调用 .wait()
并将队列是否未满/空的条件放入 while 循环条件中:
while not queue:
print('queue is empty, stop consuming')
item_ok.wait()
print('trying again')
通过使用这种方法(也用于上面链接的文档中),线程在被唤醒并获得锁后检查队列是否仍然不为空/满。如果条件不再满足(因为另一个线程在其间执行),线程将再次等待条件。
顺便说一下,上面描述的python 2 和3 之间的区别也是您的程序在两个版本上表现不同的原因。这是记录在案的行为,而不是实施中的错误。
生产者和消费者线程的固定代码(在过去 30 分钟内 运行 在我的机器上运行良好)看起来像这样(我删除了颜色,因为我不想安装包裹):
class ProducerThread(Thread):
def run(self):
global queue
while True:
qlock.acquire()
while len(queue) >= CAPACITY:
print('queue is full, stop producing')
space_ok.wait()
print('trying again')
item = chr(ord('A')+randint(0,25))
print('['+' '.join(queue)+'] <= '+item)
queue.append(item)
item_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
class ConsumerThread(Thread):
def run(self):
global queue
while True:
qlock.acquire()
while not queue:
print('queue is empty, stop consuming')
item_ok.wait()
print('trying again')
item = queue.pop(0)
print(item+' <= ['+' '.join(queue)+']')
space_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
奖金
您提到无法使用 Ctrl-C
(KeyboardInterrupt) 退出程序。要解决此问题,您可以使线程 "daemons" 这意味着它们会在主线程结束后立即退出。使用上面的代码,Ctrl-C
可以很好地结束程序:
ProducerThread(name='red', daemon=True).start()
ProducerThread(name='green', daemon=True).start()
ProducerThread(name='blue', daemon=True).start()
ConsumerThread(name='cyan', daemon=True).start()
ConsumerThread(name='magenta', daemon=True).start()
ConsumerThread(name='yellow', daemon=True).start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Exiting")
这是否解决了您的问题?请在下方留言。