process.join() 在使用基于队列的多进程时挂起
process.join() hangs when using Queue based multi process
以下代码旨在创建一个具有 images
数据的进程,直到进程完成。此外,这些进程旨在在输入队列 q
变空后终止。
import multiprocessing
import numpy as np
def run_mp(images, f_work, n_worker):
q = multiprocessing.Queue()
for img_idx in range(len(images)):
q.put(img_idx)
result_q = multiprocessing.Queue()
def f_worker(worker_index, data, q, result_q):
print("worker {} started".format(worker_index))
while not q.empty():
image_idx = q.get(timeout=1)
print('processing image idx {}'.format(image_idx))
image_out = f_work(data, image_idx)
result_q.put((image_out, image_idx))
print("worker {} finished".format(worker_index))
return
processes = list()
for i in range(n_worker):
process = multiprocessing.Process(target=f_worker, args=(i, images, q, result_q))
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
images = [np.random.randn(100, 100) for _ in range(20)]
f = lambda image_list, idx: image_list[idx] + np.random.randn()
run_mp(images, f, 2)
在 运行 代码之后,从嵌入式打印功能,我确认所有 20 张图像都已处理,并且有两个进程已终止。但是,程序挂起。当我点击 ctrl-c
时,我收到了以下消息,其中挂起发生在 os.waitpid
。
我该如何解决这个问题?
worker 0 started
processing image idx 0
processing image idx 1
worker 1 started
processing image idx 2
processing image idx 3
processing image idx 4
processing image idx 5
processing image idx 6
processing image idx 7
processing image idx 8
processing image idx 9
processing image idx 10
processing image idx 11
processing image idx 12
processing image idx 13
processing image idx 14
processing image idx 15
processing image idx 16
processing image idx 17
processing image idx 18
processing image idx 19
worker 1 finished
^CTraceback (most recent call last):
File "example_queue.py", line 35, in <module>
run_mp(images, f, 2)
File "example_queue.py", line 29, in run_mp
process.join()
File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
首先,请注意您的代码只能在使用 OS fork 创建新进程的平台上运行,因为:
- 创建新进程的代码未包含在
if __name__ == '__main__':
块中。
- 您的辅助函数
f_worker
不在全局范围内。
如果你阅读了关于multiprocessing.Queue
的文档,你会发现对q.empty()
的调用是完全不可靠的,幸运的是,它是不必要的,因为有一种解决方案可以使这个调用变得不必要。此外,如果您对放入 result_q
的实际结果感兴趣,请注意您 必须 get
在主进程尝试加入已完成放入队列的子进程。文档也对此进行了解释。
避免必须使用对 q.empty()
的不可靠调用的解决方案是写入“任务队列”,在本例中为 q
,遵循需要处理的实际项目您的子流程,N 个哨兵项目 不会被误认为是要处理的实际项目,而是向子流程发出特殊信号,表明没有更多项目要处理。 N 当然,只是从任务队列中读取的子进程的数量,因此当每个子进程读取一个哨兵项时,它就会终止,并且有足够的哨兵项来向每个子进程发出信号。在这种情况下 None
可以完美地用作合适的哨兵物品:
import multiprocessing
import numpy as np
def run_mp(images, f_work, n_worker):
def f_worker(worker_index, data, q, result_q):
print("worker {} started".format(worker_index))
while True:
image_idx = q.get() # Blocking get
if image_idx is None: # Sentinel?
break # We are done!
print('processing image idx {}'.format(image_idx))
image_out = f_work(data, image_idx)
result_q.put((image_out, image_idx))
print("worker {} finished".format(worker_index))
return
q = multiprocessing.Queue()
for img_idx in range(len(images)):
q.put(img_idx)
# Add sentinels:
for _ in range(n_worker):
q.put(None)
result_q = multiprocessing.Queue()
processes = list()
for i in range(n_worker):
process = multiprocessing.Process(target=f_worker, args=(i, images, q, result_q))
# We do not need daemon processes now:
#process.daemon = True
process.start()
processes.append(process)
# If we are interested in the results, we must process the result queue
# before joining the processes. We are expecting 20 results, so:
results = [result_q.get() for _ in range(20)]
print(results)
for process in processes:
process.join()
images = [np.random.randn(100, 100) for _ in range(20)]
f = lambda image_list, idx: image_list[idx] + np.random.randn()
run_mp(images, f, 2)
以下代码旨在创建一个具有 images
数据的进程,直到进程完成。此外,这些进程旨在在输入队列 q
变空后终止。
import multiprocessing
import numpy as np
def run_mp(images, f_work, n_worker):
q = multiprocessing.Queue()
for img_idx in range(len(images)):
q.put(img_idx)
result_q = multiprocessing.Queue()
def f_worker(worker_index, data, q, result_q):
print("worker {} started".format(worker_index))
while not q.empty():
image_idx = q.get(timeout=1)
print('processing image idx {}'.format(image_idx))
image_out = f_work(data, image_idx)
result_q.put((image_out, image_idx))
print("worker {} finished".format(worker_index))
return
processes = list()
for i in range(n_worker):
process = multiprocessing.Process(target=f_worker, args=(i, images, q, result_q))
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
images = [np.random.randn(100, 100) for _ in range(20)]
f = lambda image_list, idx: image_list[idx] + np.random.randn()
run_mp(images, f, 2)
在 运行 代码之后,从嵌入式打印功能,我确认所有 20 张图像都已处理,并且有两个进程已终止。但是,程序挂起。当我点击 ctrl-c
时,我收到了以下消息,其中挂起发生在 os.waitpid
。
我该如何解决这个问题?
worker 0 started
processing image idx 0
processing image idx 1
worker 1 started
processing image idx 2
processing image idx 3
processing image idx 4
processing image idx 5
processing image idx 6
processing image idx 7
processing image idx 8
processing image idx 9
processing image idx 10
processing image idx 11
processing image idx 12
processing image idx 13
processing image idx 14
processing image idx 15
processing image idx 16
processing image idx 17
processing image idx 18
processing image idx 19
worker 1 finished
^CTraceback (most recent call last):
File "example_queue.py", line 35, in <module>
run_mp(images, f, 2)
File "example_queue.py", line 29, in run_mp
process.join()
File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
首先,请注意您的代码只能在使用 OS fork 创建新进程的平台上运行,因为:
- 创建新进程的代码未包含在
if __name__ == '__main__':
块中。 - 您的辅助函数
f_worker
不在全局范围内。
如果你阅读了关于multiprocessing.Queue
的文档,你会发现对q.empty()
的调用是完全不可靠的,幸运的是,它是不必要的,因为有一种解决方案可以使这个调用变得不必要。此外,如果您对放入 result_q
的实际结果感兴趣,请注意您 必须 get
在主进程尝试加入已完成放入队列的子进程。文档也对此进行了解释。
避免必须使用对 q.empty()
的不可靠调用的解决方案是写入“任务队列”,在本例中为 q
,遵循需要处理的实际项目您的子流程,N 个哨兵项目 不会被误认为是要处理的实际项目,而是向子流程发出特殊信号,表明没有更多项目要处理。 N 当然,只是从任务队列中读取的子进程的数量,因此当每个子进程读取一个哨兵项时,它就会终止,并且有足够的哨兵项来向每个子进程发出信号。在这种情况下 None
可以完美地用作合适的哨兵物品:
import multiprocessing
import numpy as np
def run_mp(images, f_work, n_worker):
def f_worker(worker_index, data, q, result_q):
print("worker {} started".format(worker_index))
while True:
image_idx = q.get() # Blocking get
if image_idx is None: # Sentinel?
break # We are done!
print('processing image idx {}'.format(image_idx))
image_out = f_work(data, image_idx)
result_q.put((image_out, image_idx))
print("worker {} finished".format(worker_index))
return
q = multiprocessing.Queue()
for img_idx in range(len(images)):
q.put(img_idx)
# Add sentinels:
for _ in range(n_worker):
q.put(None)
result_q = multiprocessing.Queue()
processes = list()
for i in range(n_worker):
process = multiprocessing.Process(target=f_worker, args=(i, images, q, result_q))
# We do not need daemon processes now:
#process.daemon = True
process.start()
processes.append(process)
# If we are interested in the results, we must process the result queue
# before joining the processes. We are expecting 20 results, so:
results = [result_q.get() for _ in range(20)]
print(results)
for process in processes:
process.join()
images = [np.random.randn(100, 100) for _ in range(20)]
f = lambda image_list, idx: image_list[idx] + np.random.randn()
run_mp(images, f, 2)