为什么即使完成对池的映射调用,进程也需要很长时间才能加入?

Why do processes take long time to join even when the map call to pool is completed?

这是关于 Python 3.5 中的 multiprocessing 模块的另一个问题。我的问题是我知道所有的分叉处理都完成了他们的工作(我可以在 Queue 中看到他们的结果), AsyncResult.result() returns True 这意味着工作已经完成但是当我继续 PoolObj.join(),这需要很长时间。我知道我可以 PoolObj.terminate() 继续我的生活,但我想知道为什么会发生这种情况?

我正在使用以下代码:

def worker(d):
    queue.put(d)

def gen_data():
    for i in range(int(1e6)):
        yield i

if __name__ == "__main__":
    queue = Queue(maxsize=-1)
    pool = Pool(processes=12)
    pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1)
    pool.close()

    print ('Lets run the workers...\n')
    while True:
        if pool_obj_worker.ready():
            if pool_obj_worker.successful():
                print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done
            else:
                print ('\nAll processed. Errors encountered!')
            sys.stdout.flush()
            print (q.qsize()) # The size is right that means all workers have done their job
            pool.join() # will get stuck here for long long time
            queue.put('*')           
            break
    print ('%d still to be processed' %
           pool_obj_worker._number_left)
    sys.stdout.flush()
    time.sleep(0.5)

我做错了吗?请赐教。还是持有 join() 的进程已经变成僵尸?

这里的问题是您在 worker 中使用了额外的 Queue,而不是 Pool 提供的那个。 当进程完成它们的工作时,它们将全部加入 multiprocessing.Queue 中使用的 FeederThread 并且这些调用将挂起(可能是因为所有线程同时调用 join 并且可能存在一些奇怪的竞争条件,调查起来并不容易)。

添加 multiprocessing.util.log_to_stderr(10) 允许显示您的进程在加入队列供给器线程时挂起。

要解决您的问题,您可以使用 multiprocessing.SimpleQueue 而不是 multiprocessing.Queue(没有挂起,因为没有馈线)或尝试使用提供的方法 pool.unordered_imap与您似乎实现的行为相同(返回包含 worker 返回结果的无序生成器)。