Python 队列使用在线程中有效,但(显然)在多处理中无效
Python Queue usage works in threading but (apparently) not in multiprocessing
许多关于多处理使用的教程似乎没有完全说明为什么下面的技术适用于线程而不是多处理。
为什么这不适用于多处理,我尝试的实现是什么?谢谢!
线程实现(工作正常,对我来说很有意义):
from threading import Thread
from Queue import Queue
from time import sleep
"""threading functions"""
def producer_thread(n):
for x in range(10):
thread_q.put(n)
def consumer_thread():
while True:
item = thread_q.get()
print item
if __name__ == '__main__':
thread_q = Queue()
"""works fine"""
p_thread = Thread(target=producer_thread, args=(10,))
c_thread = Thread(target=consumer_thread)
c_thread.daemon=True
p_thread.start(); c_thread.start()
p_thread.join()
"""prevents c_thread daemon process from cancelling prematurely"""
sleep(.001)
输出:
10
10
10
10
10
10
10
10
10
10
多处理实现(似乎与线程相同但根本不起作用):
from multiprocessing import Process, freeze_support
from Queue import Queue
"""multiprocessing functions"""
def producer_process(n):
for x in range(10):
process_q.put(n)
def consumer_process():
while True:
item = process_q.get()
print item
#
if __name__ == '__main__':
freeze_support()
process_q = Queue()
"""computer explodes"""
p_process = Process(target=producer_process, args=(10,))
c_process = Process(target=consumer_process)
c_process.daemon=True
p_process.start(); c_process.start()
p_process.join()
输出:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Process Process-33:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
item = q.get()
NameError: global name 'q' is not defined
Process Process-32:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
q.put(n)
NameError: global name 'q' is not defined
Process Process-34:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
q.put(n)
NameError: global name 'q' is not defined
Process Process-35:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
item = q.get()
NameError: global name 'q' is not defined
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'consumer'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'producer'
当我运行线程版本时,我得到:
File "test.py", line 18, in <module>
p_thread = Thread(target=producer, args=(10,))
NameError: name 'producer' is not defined
此外,我认为多处理版本中有一个错误
NameError: global name 'q' is not defined
应该是打错了。似乎没有定义任何名为 "q" 的内容。
编辑:
现在我 运行 线程版本,发现打印了不到十个“10”:通常有或四个 - 并且它在不同的 运行 中随机变化。我正在使用 python 2.7.5 。你能检查一下这个问题吗?
编辑
我运行 mp版本,没有任何输出或错误信息,程序很快就终止了。我相信逻辑上存在一些问题——而且这是不容忽视的。我认为先修复线程版本可能对你有很大帮助。
好的,(抱歉(?))回答我自己的问题,我找到了我正在尝试做的事情的有效实现。似乎有很多细微差别。
首先,多处理需要 JoinableQueue 而不是标准队列。
其次,由于多处理函数正在适当地修改队列,队列需要作为参数传递给函数——也许这应该是显而易见的,但我显然忽略了它。
第三,也许是最重要的一点,线程不会打印到解释器的标准输出——它们打印到 windows 标准输出,所以你必须从命令行 运行 如果你想看到输出。
"""multiprocessing functions"""
def producer_process(n, q):
for x in range(10):
q.put(n)
def consumer_process(q):
while True:
item = q.get()
print item
q.task_done()
if __name__ == '__main__':
from multiprocessing import Process, freeze_support, JoinableQueue
freeze_support()
process_q = JoinableQueue()
'''launch consumer process'''
c_process = Process(target=consumer_process, args=(process_q,))
c_process.daemon = True
c_process.start()
'''launch producer process'''
p_process = Process(target=producer_process, args=(10, process_q))
p_process.start()
p_process.join()
process_q.join()
print "Done"
import Queue 适用于多线程应用程序:https://docs.python.org/2/library/queue.html 不适用于多进程应用程序。
from multiprocessing import Queue 适用于多进程应用程序:https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes
根据文档 multiprocessing.Queue "is a near clone of Queue.Queue"
除了 multiprocessing.Queue 之外,还有 JoinableQueue,它具有 task_done() 和 join() 方法,以备不时之需。
在您的示例中,我认为您不需要 JoinableQueue。你试过这个吗:
from multiprocessing import (Process, Queue, freeze_support)
def producer(q, n):
for x in range(n):
q.put(x)
q.put("end")
def consumer(q):
while True:
item = q.get()
if item == "end":
break
print item
if __name__ == '__main__':
freeze_support()
q = Queue()
c = Process(target=consumer, args=(q,))
c.start()
p = Process(target=producer, args=(q, 10))
p.start()
c.join()
在 Linux 和 Windows 中测试。
许多关于多处理使用的教程似乎没有完全说明为什么下面的技术适用于线程而不是多处理。
为什么这不适用于多处理,我尝试的实现是什么?谢谢!
线程实现(工作正常,对我来说很有意义):
from threading import Thread
from Queue import Queue
from time import sleep
"""threading functions"""
def producer_thread(n):
for x in range(10):
thread_q.put(n)
def consumer_thread():
while True:
item = thread_q.get()
print item
if __name__ == '__main__':
thread_q = Queue()
"""works fine"""
p_thread = Thread(target=producer_thread, args=(10,))
c_thread = Thread(target=consumer_thread)
c_thread.daemon=True
p_thread.start(); c_thread.start()
p_thread.join()
"""prevents c_thread daemon process from cancelling prematurely"""
sleep(.001)
输出:
10
10
10
10
10
10
10
10
10
10
多处理实现(似乎与线程相同但根本不起作用):
from multiprocessing import Process, freeze_support
from Queue import Queue
"""multiprocessing functions"""
def producer_process(n):
for x in range(10):
process_q.put(n)
def consumer_process():
while True:
item = process_q.get()
print item
#
if __name__ == '__main__':
freeze_support()
process_q = Queue()
"""computer explodes"""
p_process = Process(target=producer_process, args=(10,))
c_process = Process(target=consumer_process)
c_process.daemon=True
p_process.start(); c_process.start()
p_process.join()
输出:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Process Process-33:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
item = q.get()
NameError: global name 'q' is not defined
Process Process-32:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
q.put(n)
NameError: global name 'q' is not defined
Process Process-34:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
q.put(n)
NameError: global name 'q' is not defined
Process Process-35:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
item = q.get()
NameError: global name 'q' is not defined
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'consumer'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'producer'
当我运行线程版本时,我得到:
File "test.py", line 18, in <module>
p_thread = Thread(target=producer, args=(10,))
NameError: name 'producer' is not defined
此外,我认为多处理版本中有一个错误
NameError: global name 'q' is not defined
应该是打错了。似乎没有定义任何名为 "q" 的内容。
编辑: 现在我 运行 线程版本,发现打印了不到十个“10”:通常有或四个 - 并且它在不同的 运行 中随机变化。我正在使用 python 2.7.5 。你能检查一下这个问题吗?
编辑 我运行 mp版本,没有任何输出或错误信息,程序很快就终止了。我相信逻辑上存在一些问题——而且这是不容忽视的。我认为先修复线程版本可能对你有很大帮助。
好的,(抱歉(?))回答我自己的问题,我找到了我正在尝试做的事情的有效实现。似乎有很多细微差别。
首先,多处理需要 JoinableQueue 而不是标准队列。
其次,由于多处理函数正在适当地修改队列,队列需要作为参数传递给函数——也许这应该是显而易见的,但我显然忽略了它。
第三,也许是最重要的一点,线程不会打印到解释器的标准输出——它们打印到 windows 标准输出,所以你必须从命令行 运行 如果你想看到输出。
"""multiprocessing functions"""
def producer_process(n, q):
for x in range(10):
q.put(n)
def consumer_process(q):
while True:
item = q.get()
print item
q.task_done()
if __name__ == '__main__':
from multiprocessing import Process, freeze_support, JoinableQueue
freeze_support()
process_q = JoinableQueue()
'''launch consumer process'''
c_process = Process(target=consumer_process, args=(process_q,))
c_process.daemon = True
c_process.start()
'''launch producer process'''
p_process = Process(target=producer_process, args=(10, process_q))
p_process.start()
p_process.join()
process_q.join()
print "Done"
import Queue 适用于多线程应用程序:https://docs.python.org/2/library/queue.html 不适用于多进程应用程序。
from multiprocessing import Queue 适用于多进程应用程序:https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes
根据文档 multiprocessing.Queue "is a near clone of Queue.Queue"
除了 multiprocessing.Queue 之外,还有 JoinableQueue,它具有 task_done() 和 join() 方法,以备不时之需。
在您的示例中,我认为您不需要 JoinableQueue。你试过这个吗:
from multiprocessing import (Process, Queue, freeze_support)
def producer(q, n):
for x in range(n):
q.put(x)
q.put("end")
def consumer(q):
while True:
item = q.get()
if item == "end":
break
print item
if __name__ == '__main__':
freeze_support()
q = Queue()
c = Process(target=consumer, args=(q,))
c.start()
p = Process(target=producer, args=(q, 10))
p.start()
c.join()
在 Linux 和 Windows 中测试。