python 多处理问题
python multiprocessing issues
我在使用进程和队列时遇到了一些问题。
当我 运行 以下代码时,目标函数只是从主队列中获取一个项目并将其添加到特定于该进程的另一个队列中。
import sys
import multiprocessing
from Queue import Empty
# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break
# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
main_queue.put(i)
all_queues = []
processes = []
# Create processes
for i in xrange(5):
# Each process gets a queue that it will put numbers into
queue = multiprocessing.Queue()
# Keep up with the queue we are creating so we can get it later
all_queues.append(queue)
# Pass in our master queue and the queue we are transferring data to
process = multiprocessing.Process(target=my_callable,
args=(main_queue, queue))
# Keep up with the processes
processes.append(process)
for thread in processes:
thread.start()
for thread in processes:
thread.join()
当目标函数打印正在使用的队列时,您会注意到几乎只使用了一个队列。
如果您随后获取输出并打印出来,您会发现大多数数字最终都在一个队列中。
def queue_get_all(q):
items = []
maxItemsToRetreive = 100
for numOfItemsRetrieved in range(0, maxItemsToRetreive):
try:
if numOfItemsRetrieved == maxItemsToRetreive:
break
items.append(q.get_nowait())
except Empty, e:
break
return items
for tmp in all_queues:
print queue_get_all(tmp)
这是什么原因造成的?我的代码中有什么我应该做的,可以平衡这些进程正在做的工作吗?
输出
[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]
我认为你这里有两个问题:
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break
来自 get 的文档:
Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).
由于您将 0
作为第一个参数传递,因此它等同于 get(False)
。这使它成为非阻塞的,这意味着如果它不能立即获取值,它将引发 Empty 异常,这将结束您的工作进程。由于您所有的 'work' 函数都是相同的,并且尝试同时从主队列中拉取数据,因此有些函数可能无法立即获取值而死亡。
给 .get()
一个小的超时应该可以解决这个问题。
第二个问题是您的 'work' 函数完成时间基本上为零。使用 sleep(.2)
稍微暂停一下以模拟一些非繁琐的工作,它将分布在所有工作人员中:
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(True, .1)
sleep(0.2)
to_queue.put(tmp)
except Empty:
break
编辑:
忘了说了,一般这类问题最好不要依赖.get()
的超时时间来提示队列结束。如果您使用传递到队列中的某种类型的 "end of queue" 标记对象来告诉工作人员是时候退出了,您将获得更多控制权。这样你就可以让它们全部阻塞,等待新的输入或退出 "command".
我在使用进程和队列时遇到了一些问题。
当我 运行 以下代码时,目标函数只是从主队列中获取一个项目并将其添加到特定于该进程的另一个队列中。
import sys
import multiprocessing
from Queue import Empty
# This is just taking a number from the queue
# and adding it to another queue
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break
# Create a master queue and fill it with numbers
main_queue = multiprocessing.Queue()
for i in xrange(100):
main_queue.put(i)
all_queues = []
processes = []
# Create processes
for i in xrange(5):
# Each process gets a queue that it will put numbers into
queue = multiprocessing.Queue()
# Keep up with the queue we are creating so we can get it later
all_queues.append(queue)
# Pass in our master queue and the queue we are transferring data to
process = multiprocessing.Process(target=my_callable,
args=(main_queue, queue))
# Keep up with the processes
processes.append(process)
for thread in processes:
thread.start()
for thread in processes:
thread.join()
当目标函数打印正在使用的队列时,您会注意到几乎只使用了一个队列。
如果您随后获取输出并打印出来,您会发现大多数数字最终都在一个队列中。
def queue_get_all(q):
items = []
maxItemsToRetreive = 100
for numOfItemsRetrieved in range(0, maxItemsToRetreive):
try:
if numOfItemsRetrieved == maxItemsToRetreive:
break
items.append(q.get_nowait())
except Empty, e:
break
return items
for tmp in all_queues:
print queue_get_all(tmp)
这是什么原因造成的?我的代码中有什么我应该做的,可以平衡这些进程正在做的工作吗?
输出
[0, 2, 3, 4, 5, 6, 7, 8]
[1, 9, 10]
[11, 14, 15, 16]
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
[13]
我认为你这里有两个问题:
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(0)
to_queue.put(tmp)
print to_queue
except Empty:
break
来自 get 的文档:
Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).
由于您将 0
作为第一个参数传递,因此它等同于 get(False)
。这使它成为非阻塞的,这意味着如果它不能立即获取值,它将引发 Empty 异常,这将结束您的工作进程。由于您所有的 'work' 函数都是相同的,并且尝试同时从主队列中拉取数据,因此有些函数可能无法立即获取值而死亡。
给 .get()
一个小的超时应该可以解决这个问题。
第二个问题是您的 'work' 函数完成时间基本上为零。使用 sleep(.2)
稍微暂停一下以模拟一些非繁琐的工作,它将分布在所有工作人员中:
def my_callable(from_queue, to_queue):
while True:
try:
tmp = from_queue.get(True, .1)
sleep(0.2)
to_queue.put(tmp)
except Empty:
break
编辑:
忘了说了,一般这类问题最好不要依赖.get()
的超时时间来提示队列结束。如果您使用传递到队列中的某种类型的 "end of queue" 标记对象来告诉工作人员是时候退出了,您将获得更多控制权。这样你就可以让它们全部阻塞,等待新的输入或退出 "command".