Python multiprocessing:如何启动相互依赖的进程?

Python multiprocessing: How to start processes that depend on each other?

我有一个基本问题困扰着 Python 多处理方法,即如何以最佳方式启动使用队列传输数据的不同进程。

为此,我使用了一个简单的示例,其中

  1. 收到数据
  2. 数据已处理
  3. 数据发送中

所有上面的步骤应该通过三个不同的过程并行发生。

这里是示例代码:

import multiprocessing
import keyboard
import time

def getData(queue_raw):
    for num in range(1000):
        queue_raw.put(num)
        print("getData: put "+ str(num)+" in queue_raw")
    while True:
        if keyboard.read_key() == "s":
            break

def calcFeatures(queue_raw, queue_features):
    while not queue_raw.empty():
        data = queue_raw.get()
        queue_features.put(data**2)
        print("calcFeatures: put "+ str(data**2)+" in queue_features")

def sendFeatures(queue_features):
    while not queue_features.empty():
        feature = queue_features.get()
        print("sendFeatures: put "+ str(feature)+" out")

if __name__ == "__main__":

    queue_raw = multiprocessing.Queue()
    queue_features = multiprocessing.Queue()

    processes = [

        multiprocessing.Process(target=getData, args=(queue_raw,)),
        multiprocessing.Process(target=calcFeatures, args=(queue_raw, queue_features,)),
        multiprocessing.Process(target=sendFeatures, args=(queue_features,))
    ]

    processes[0].start()
    time.sleep(0.1)
    processes[1].start()
    time.sleep(0.1)
    processes[2].start()

    #for p in processes:
    #    p.start()
    for p in processes:
        p.join()

这个程序有效,但我的问题是关于不同进程的启动。 理想情况下,只有 process[0] 将数据放入 queue_raw 时,process[1] 才应该启动;而 process[2] 只有在 process[1] 将计算的特征放入 queue_features 时才应该开始。

现在我通过 time.sleep() 函数做到了这一点,这是次优的,因为我不一定知道这些过程需要多长时间。 我也试过类似的东西:

processes[0].start()
while queue_raw.empty():
    time.sleep(0.5)
processes[1].start()

但是不行,因为估计只有第一个进程。有什么方法可以完成这个过程取决于开始吗?

@moooeeeeep 指出了正确的评论。 使用 while not queue.empty(): 检查不是等到数据实际在队列中!

通过哨兵对象(这里是 None)和 while True 循环的方法将强制进程等待直到其他进程将数据放入队列:

FLAG_STOP=False
while FLAG_STOP is False:
    data = queue_raw.get()  # get will wait
    if data is None:
        # Finish analysis
        FLAG_STOP = True
    else:
        # work with data