python multiprocessing.Queue 没有输入所有值

python multiprocessing.Queue not putting through all the values

我有一些 multiprocessing.Queues 列表可以在两个进程之间进行通信。我想发送一个“None”作为每个队列的最后一个值,以向第二个进程指示数据流的结束,但这似乎并不总是有效(我得到 None 在一些队列中,但不是在每个队列中),除非我在 put() 指令之一之后添加至少一个 print()。

澄清:有时没有印刷品也能工作,但并非总是如此。此外,当我输入打印说明时,到目前为止 100% 的时间都有效。
我还尝试为 put() 方法设置 block=True,但这似乎没有任何区别。

我在尝试调试问题时找到了这个解决方案,以查明我在将值放入队列或获取它们时是否遇到问题,但是当我将 print() 放在 put() 上时一边,代码总是有效的。

编辑: 一个简化但完整的版本,部分重现了问题:我已经确定了两个可能有问题的部分,在代码中标记为 CODEBLOCK1 和 CODEBLOCK2:如果我取消注释其中任何一个,代码将按预期工作。

minimal_example.py:

import multiprocessing, processes


def MainProcess():

    multiprocessing.set_start_method("spawn")
    metricsQueue = multiprocessing.Queue() # Virtually infinite size

    # Define and start the parallel processes
    process1 = multiprocessing.Process(target=processes.Process1,
                                        args=(metricsQueue,))

    process2 = multiprocessing.Process(target=processes.Process2,
                                        args=(metricsQueue,))

    process1.start()
    process2.start()

    process1.join()
    process2.join()


# Script entry point
if __name__ == '__main__':

    MainProcess()

processes.py:

import random, queue

def Process1(metricsQueue):

    print("Start of process 1")

    # Cancel join for the queues, so that upon killing this process, the main process does not block on join if there
    # are still elements on the queues -> We don't mind losing data if the process is killed.
    # Start of CODEBLOCK1
    metricsQueue.cancel_join_thread()
    # End of CODEBLOCK1

    longData = random.sample(range(10205, 26512), 992)

    # Start of CODEBLOCK2
    # Put a big number of data in the queue
    for data in longData:
        try:
            metricsQueue.put(data, block=False)

        except queue.Full:
            print("Error")
    # End of CODEBLOCK2


    # Once finished, push a None through all queues to mark the end of the process
    try:
        metricsQueue.put(None, block=False)
        print("put None in metricsQueue")

    except queue.Full:
        print("Error")

    print("End of process 1")



def Process2(metricsQueue):

    print("Start of process 2")

    newMetricsPoint = 0
    recoveredMetrics = []

    while (newMetricsPoint is not None):

        # Metrics point
        try:
            newMetricsPoint = metricsQueue.get(block=False)

        except queue.Empty:
            pass

        else:
            if (newMetricsPoint is not None):
                recoveredMetrics.append(newMetricsPoint)
                print(f"got {len(recoveredMetrics)} points so far")

            else:
                print("get None from metricsQueue")

    print("End of process 2")

这段代码给出的结果是这样的,第二个过程永远不会结束,因为卡在了 wile 循环中:

Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1

如果我注释 CODEBLOCK1 或 CODEBLOCK2,代码将按预期工作:

Start of process 1
Start of process 2
put None in metricsQueue 0
End of process 1
get None from metricsQueue 0
End of process 2

没关系,我发现了问题。 原来我误解了 queue.cancel_join_thread() 的作用。 这使得进程 1 在发送完所有数据后完成,即使队列中还有一些数据要被我的第二个进程使用。这会导致所有未使用的数据被刷新,因此丢失,永远不会到达我的第二个进程。

We don't mind losing data if the process is killed.

这个假设是不正确的。关闭信号 None 是数据的一部分;丢失它会阻止兄弟进程关闭。

如果进程依赖于关闭信号,请不要.cancel_join_thread()用于发送该信号的队列。