python multiprocessing.Queue 没有输入所有值
python multiprocessing.Queue not putting through all the values
我有一些 multiprocessing.Queues 列表可以在两个进程之间进行通信。我想发送一个“None”作为每个队列的最后一个值,以向第二个进程指示数据流的结束,但这似乎并不总是有效(我得到 None 在一些队列中,但不是在每个队列中),除非我在 put() 指令之一之后添加至少一个 print()。
澄清:有时没有印刷品也能工作,但并非总是如此。此外,当我输入打印说明时,到目前为止 100% 的时间都有效。
我还尝试为 put() 方法设置 block=True,但这似乎没有任何区别。
我在尝试调试问题时找到了这个解决方案,以查明我在将值放入队列或获取它们时是否遇到问题,但是当我将 print() 放在 put() 上时一边,代码总是有效的。
编辑:
一个简化但完整的版本,部分重现了问题:我已经确定了两个可能有问题的部分,在代码中标记为 CODEBLOCK1 和 CODEBLOCK2:如果我取消注释其中任何一个,代码将按预期工作。
minimal_example.py:
import multiprocessing, processes
def MainProcess():
multiprocessing.set_start_method("spawn")
metricsQueue = multiprocessing.Queue() # Virtually infinite size
# Define and start the parallel processes
process1 = multiprocessing.Process(target=processes.Process1,
args=(metricsQueue,))
process2 = multiprocessing.Process(target=processes.Process2,
args=(metricsQueue,))
process1.start()
process2.start()
process1.join()
process2.join()
# Script entry point
if __name__ == '__main__':
MainProcess()
processes.py:
import random, queue
def Process1(metricsQueue):
print("Start of process 1")
# Cancel join for the queues, so that upon killing this process, the main process does not block on join if there
# are still elements on the queues -> We don't mind losing data if the process is killed.
# Start of CODEBLOCK1
metricsQueue.cancel_join_thread()
# End of CODEBLOCK1
longData = random.sample(range(10205, 26512), 992)
# Start of CODEBLOCK2
# Put a big number of data in the queue
for data in longData:
try:
metricsQueue.put(data, block=False)
except queue.Full:
print("Error")
# End of CODEBLOCK2
# Once finished, push a None through all queues to mark the end of the process
try:
metricsQueue.put(None, block=False)
print("put None in metricsQueue")
except queue.Full:
print("Error")
print("End of process 1")
def Process2(metricsQueue):
print("Start of process 2")
newMetricsPoint = 0
recoveredMetrics = []
while (newMetricsPoint is not None):
# Metrics point
try:
newMetricsPoint = metricsQueue.get(block=False)
except queue.Empty:
pass
else:
if (newMetricsPoint is not None):
recoveredMetrics.append(newMetricsPoint)
print(f"got {len(recoveredMetrics)} points so far")
else:
print("get None from metricsQueue")
print("End of process 2")
这段代码给出的结果是这样的,第二个过程永远不会结束,因为卡在了 wile 循环中:
Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
如果我注释 CODEBLOCK1 或 CODEBLOCK2,代码将按预期工作:
Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
get None from metricsQueue 0
End of process 2
没关系,我发现了问题。
原来我误解了 queue.cancel_join_thread()
的作用。
这使得进程 1 在发送完所有数据后完成,即使队列中还有一些数据要被我的第二个进程使用。这会导致所有未使用的数据被刷新,因此丢失,永远不会到达我的第二个进程。
We don't mind losing data if the process is killed.
这个假设是不正确的。关闭信号 None
是数据的一部分;丢失它会阻止兄弟进程关闭。
如果进程依赖于关闭信号,请不要.cancel_join_thread()
用于发送该信号的队列。
我有一些 multiprocessing.Queues 列表可以在两个进程之间进行通信。我想发送一个“None”作为每个队列的最后一个值,以向第二个进程指示数据流的结束,但这似乎并不总是有效(我得到 None 在一些队列中,但不是在每个队列中),除非我在 put() 指令之一之后添加至少一个 print()。
澄清:有时没有印刷品也能工作,但并非总是如此。此外,当我输入打印说明时,到目前为止 100% 的时间都有效。
我还尝试为 put() 方法设置 block=True,但这似乎没有任何区别。
我在尝试调试问题时找到了这个解决方案,以查明我在将值放入队列或获取它们时是否遇到问题,但是当我将 print() 放在 put() 上时一边,代码总是有效的。
编辑: 一个简化但完整的版本,部分重现了问题:我已经确定了两个可能有问题的部分,在代码中标记为 CODEBLOCK1 和 CODEBLOCK2:如果我取消注释其中任何一个,代码将按预期工作。
minimal_example.py:
import multiprocessing, processes
def MainProcess():
multiprocessing.set_start_method("spawn")
metricsQueue = multiprocessing.Queue() # Virtually infinite size
# Define and start the parallel processes
process1 = multiprocessing.Process(target=processes.Process1,
args=(metricsQueue,))
process2 = multiprocessing.Process(target=processes.Process2,
args=(metricsQueue,))
process1.start()
process2.start()
process1.join()
process2.join()
# Script entry point
if __name__ == '__main__':
MainProcess()
processes.py:
import random, queue
def Process1(metricsQueue):
print("Start of process 1")
# Cancel join for the queues, so that upon killing this process, the main process does not block on join if there
# are still elements on the queues -> We don't mind losing data if the process is killed.
# Start of CODEBLOCK1
metricsQueue.cancel_join_thread()
# End of CODEBLOCK1
longData = random.sample(range(10205, 26512), 992)
# Start of CODEBLOCK2
# Put a big number of data in the queue
for data in longData:
try:
metricsQueue.put(data, block=False)
except queue.Full:
print("Error")
# End of CODEBLOCK2
# Once finished, push a None through all queues to mark the end of the process
try:
metricsQueue.put(None, block=False)
print("put None in metricsQueue")
except queue.Full:
print("Error")
print("End of process 1")
def Process2(metricsQueue):
print("Start of process 2")
newMetricsPoint = 0
recoveredMetrics = []
while (newMetricsPoint is not None):
# Metrics point
try:
newMetricsPoint = metricsQueue.get(block=False)
except queue.Empty:
pass
else:
if (newMetricsPoint is not None):
recoveredMetrics.append(newMetricsPoint)
print(f"got {len(recoveredMetrics)} points so far")
else:
print("get None from metricsQueue")
print("End of process 2")
这段代码给出的结果是这样的,第二个过程永远不会结束,因为卡在了 wile 循环中:
Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
如果我注释 CODEBLOCK1 或 CODEBLOCK2,代码将按预期工作:
Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
get None from metricsQueue 0
End of process 2
没关系,我发现了问题。
原来我误解了 queue.cancel_join_thread()
的作用。
这使得进程 1 在发送完所有数据后完成,即使队列中还有一些数据要被我的第二个进程使用。这会导致所有未使用的数据被刷新,因此丢失,永远不会到达我的第二个进程。
We don't mind losing data if the process is killed.
这个假设是不正确的。关闭信号 None
是数据的一部分;丢失它会阻止兄弟进程关闭。
如果进程依赖于关闭信号,请不要.cancel_join_thread()
用于发送该信号的队列。