为什么即使完成对池的映射调用,进程也需要很长时间才能加入?
Why do processes take long time to join even when the map call to pool is completed?
这是关于 Python 3.
5 中的 multiprocessing
模块的另一个问题。我的问题是我知道所有的分叉处理都完成了他们的工作(我可以在 Queue
中看到他们的结果), AsyncResult.result() returns True 这意味着工作已经完成但是当我继续 PoolObj.join(),这需要很长时间。我知道我可以 PoolObj.terminate() 继续我的生活,但我想知道为什么会发生这种情况?
我正在使用以下代码:
def worker(d):
queue.put(d)
def gen_data():
for i in range(int(1e6)):
yield i
if __name__ == "__main__":
queue = Queue(maxsize=-1)
pool = Pool(processes=12)
pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1)
pool.close()
print ('Lets run the workers...\n')
while True:
if pool_obj_worker.ready():
if pool_obj_worker.successful():
print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done
else:
print ('\nAll processed. Errors encountered!')
sys.stdout.flush()
print (q.qsize()) # The size is right that means all workers have done their job
pool.join() # will get stuck here for long long time
queue.put('*')
break
print ('%d still to be processed' %
pool_obj_worker._number_left)
sys.stdout.flush()
time.sleep(0.5)
我做错了吗?请赐教。还是持有 join()
的进程已经变成僵尸?
这里的问题是您在 worker 中使用了额外的 Queue
,而不是 Pool
提供的那个。
当进程完成它们的工作时,它们将全部加入 multiprocessing.Queue
中使用的 FeederThread
并且这些调用将挂起(可能是因为所有线程同时调用 join
并且可能存在一些奇怪的竞争条件,调查起来并不容易)。
添加 multiprocessing.util.log_to_stderr(10)
允许显示您的进程在加入队列供给器线程时挂起。
要解决您的问题,您可以使用 multiprocessing.SimpleQueue
而不是 multiprocessing.Queue
(没有挂起,因为没有馈线)或尝试使用提供的方法 pool.unordered_imap
与您似乎实现的行为相同(返回包含 worker 返回结果的无序生成器)。
这是关于 Python 3.
5 中的 multiprocessing
模块的另一个问题。我的问题是我知道所有的分叉处理都完成了他们的工作(我可以在 Queue
中看到他们的结果), AsyncResult.result() returns True 这意味着工作已经完成但是当我继续 PoolObj.join(),这需要很长时间。我知道我可以 PoolObj.terminate() 继续我的生活,但我想知道为什么会发生这种情况?
我正在使用以下代码:
def worker(d):
queue.put(d)
def gen_data():
for i in range(int(1e6)):
yield i
if __name__ == "__main__":
queue = Queue(maxsize=-1)
pool = Pool(processes=12)
pool_obj_worker = pool.map_async(worker, gen_data(), chunksize=1)
pool.close()
print ('Lets run the workers...\n')
while True:
if pool_obj_worker.ready():
if pool_obj_worker.successful():
print ('\nAll processed successfully!') # I can see this quickly, so my jobs are done
else:
print ('\nAll processed. Errors encountered!')
sys.stdout.flush()
print (q.qsize()) # The size is right that means all workers have done their job
pool.join() # will get stuck here for long long time
queue.put('*')
break
print ('%d still to be processed' %
pool_obj_worker._number_left)
sys.stdout.flush()
time.sleep(0.5)
我做错了吗?请赐教。还是持有 join()
的进程已经变成僵尸?
这里的问题是您在 worker 中使用了额外的 Queue
,而不是 Pool
提供的那个。
当进程完成它们的工作时,它们将全部加入 multiprocessing.Queue
中使用的 FeederThread
并且这些调用将挂起(可能是因为所有线程同时调用 join
并且可能存在一些奇怪的竞争条件,调查起来并不容易)。
添加 multiprocessing.util.log_to_stderr(10)
允许显示您的进程在加入队列供给器线程时挂起。
要解决您的问题,您可以使用 multiprocessing.SimpleQueue
而不是 multiprocessing.Queue
(没有挂起,因为没有馈线)或尝试使用提供的方法 pool.unordered_imap
与您似乎实现的行为相同(返回包含 worker 返回结果的无序生成器)。