numpy 和 multiprocessing.process 的奇怪行为
Weird behaviour with numpy and multiprocessing.process
抱歉,代码太长了,我已尽力使其尽可能简单且可重现。
简而言之,这个 python 脚本启动了四个进程,这些进程 运行 将数字分配到列表中。然后,将结果添加到 multiprocessing.Queue()
.
import random
import multiprocessing
import numpy
import sys
def work(subarray, queue):
result = [numpy.array([], dtype=numpy.uint64) for i in range (0, 4)]
for element in numpy.nditer(subarray):
index = random.randint(0, 3)
result[index] = numpy.append(result[index], element)
queue.put(result)
print "after the queue.put"
jobs = []
queue = multiprocessing.Queue()
subarray = numpy.array_split(numpy.arange(1, 10001, dtype=numpy.uint64), 4)
for i in range(0, 4):
process = multiprocessing.Process(target=work, args=(subarray[i], queue))
jobs.append(process)
process.start()
for j in jobs:
j.join()
print "the end"
所有进程 运行 print "after the queue.put"
行。但是,它没有到达 print "the end"
行。奇怪的是,如果我将 arange
从 10001
更改为 1001
,它就会结束。发生了什么事?
我会将我的评论扩展为一个简短的回答。由于我也不明白奇怪的行为,它只是一种解决方法。
第一个观察是,如果queue.put行被注释掉,代码会运行到最后,所以这一定是与队列有关的问题。结果实际上已添加到队列中,因此问题一定出在队列和加入之间的相互作用中。
以下代码按预期工作
import random
import multiprocessing
import numpy
import sys
import time
def work(subarray, queue):
result = [numpy.array([], dtype=numpy.uint64) for i in range (4)]
for element in numpy.nditer(subarray):
index = random.randint(0, 3)
result[index] = numpy.append(result[index], element)
queue.put(result)
print("after the queue.put")
jobs = []
queue = multiprocessing.Queue()
subarray = numpy.array_split(numpy.arange(1, 15001, dtype=numpy.uint64), 4)
for i in range(4):
process = multiprocessing.Process(target=work, args=(subarray[i], queue))
jobs.append(process)
process.start()
res = []
while len(res)<4:
res.append(queue.get())
print("the end")
大多数子进程在调用 put 时阻塞。
multiprocessing queue put
block if necessary until a free slot is available.
这可以通过在加入之前添加对 queue.get() 的调用来避免。
此外,在多处理代码中,请通过以下方式隔离父进程:
if __name__ == '__main__':
# main code here
Compulsory usage of if name==“main” in windows while using multiprocessing
这是原因:
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
抱歉,代码太长了,我已尽力使其尽可能简单且可重现。
简而言之,这个 python 脚本启动了四个进程,这些进程 运行 将数字分配到列表中。然后,将结果添加到 multiprocessing.Queue()
.
import random
import multiprocessing
import numpy
import sys
def work(subarray, queue):
result = [numpy.array([], dtype=numpy.uint64) for i in range (0, 4)]
for element in numpy.nditer(subarray):
index = random.randint(0, 3)
result[index] = numpy.append(result[index], element)
queue.put(result)
print "after the queue.put"
jobs = []
queue = multiprocessing.Queue()
subarray = numpy.array_split(numpy.arange(1, 10001, dtype=numpy.uint64), 4)
for i in range(0, 4):
process = multiprocessing.Process(target=work, args=(subarray[i], queue))
jobs.append(process)
process.start()
for j in jobs:
j.join()
print "the end"
所有进程 运行 print "after the queue.put"
行。但是,它没有到达 print "the end"
行。奇怪的是,如果我将 arange
从 10001
更改为 1001
,它就会结束。发生了什么事?
我会将我的评论扩展为一个简短的回答。由于我也不明白奇怪的行为,它只是一种解决方法。
第一个观察是,如果queue.put行被注释掉,代码会运行到最后,所以这一定是与队列有关的问题。结果实际上已添加到队列中,因此问题一定出在队列和加入之间的相互作用中。
以下代码按预期工作
import random
import multiprocessing
import numpy
import sys
import time
def work(subarray, queue):
result = [numpy.array([], dtype=numpy.uint64) for i in range (4)]
for element in numpy.nditer(subarray):
index = random.randint(0, 3)
result[index] = numpy.append(result[index], element)
queue.put(result)
print("after the queue.put")
jobs = []
queue = multiprocessing.Queue()
subarray = numpy.array_split(numpy.arange(1, 15001, dtype=numpy.uint64), 4)
for i in range(4):
process = multiprocessing.Process(target=work, args=(subarray[i], queue))
jobs.append(process)
process.start()
res = []
while len(res)<4:
res.append(queue.get())
print("the end")
大多数子进程在调用 put 时阻塞。 multiprocessing queue put
block if necessary until a free slot is available.
这可以通过在加入之前添加对 queue.get() 的调用来避免。
此外,在多处理代码中,请通过以下方式隔离父进程:
if __name__ == '__main__':
# main code here
Compulsory usage of if name==“main” in windows while using multiprocessing
这是原因:
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.