在多处理队列上使用 islice 时阻止脚本挂起
Block script from hanging when islice is used on a multiprocessing Queue
我有一个可迭代的 Python class,它环绕着一个多处理生成器。有些用例只需要生成的子集,因此它被包装在 islice 中。
然而,调用在使用 islice 时挂起,我猜是由于底层多处理进程没有意识到事情已经结束。
一个最低限度的功能示例如下:
from itertools import islice
import multiprocessing as mp
STOP_MSG = 'STOP!'
def generator(queue, max_val):
for i in range(max_val):
queue.put(i)
queue.put(STOP_MSG)
class GeneratorMPProc:
def __init__(self, max_val):
self.max_val = max_val
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,
args=(
queue,
self.max_val,
))
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()
feeder_process.join()
if __name__ == '__main__':
max_val = 0xFFFFFFFFF
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm, end_val)]
如何解决此问题,以便即使在使用 islice 或任何子集选择器时它也能正确终止?
您的 isllice
呼叫不会转到 return 直到 GeneratorMPProc.iter
return 秒,并且 max_val
设置为 0xFFFFFFFFF
(写入队列不是最快的操作,这也会占用一些资源)。换句话说,“事情 不会 结束”,直到你的生成器函数结束,因此你的 multiprocess.Process
实际上结束并且可以加入。
将 max_val
设置为一个值,例如 20,您的程序将很容易终止。
if __name__ == '__main__':
#max_val = 0xFFFFFFFFF
max_val = 20
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm, end_val)]
print(rsm)
打印:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
更新
您可能需要考虑使用额外参数 daemon=True
启动“生成器”进程,使其成为守护进程,然后从方法 __iter__
中完全删除 feeder_process.join()
。该代码甚至可以与您的原始 max_val
.
一起使用
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,
args=(
queue,
self.max_val,
),
daemon=True
)
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()
我有一个可迭代的 Python class,它环绕着一个多处理生成器。有些用例只需要生成的子集,因此它被包装在 islice 中。
然而,调用在使用 islice 时挂起,我猜是由于底层多处理进程没有意识到事情已经结束。
一个最低限度的功能示例如下:
from itertools import islice
import multiprocessing as mp
STOP_MSG = 'STOP!'
def generator(queue, max_val):
for i in range(max_val):
queue.put(i)
queue.put(STOP_MSG)
class GeneratorMPProc:
def __init__(self, max_val):
self.max_val = max_val
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,
args=(
queue,
self.max_val,
))
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()
feeder_process.join()
if __name__ == '__main__':
max_val = 0xFFFFFFFFF
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm, end_val)]
如何解决此问题,以便即使在使用 islice 或任何子集选择器时它也能正确终止?
您的 isllice
呼叫不会转到 return 直到 GeneratorMPProc.iter
return 秒,并且 max_val
设置为 0xFFFFFFFFF
(写入队列不是最快的操作,这也会占用一些资源)。换句话说,“事情 不会 结束”,直到你的生成器函数结束,因此你的 multiprocess.Process
实际上结束并且可以加入。
将 max_val
设置为一个值,例如 20,您的程序将很容易终止。
if __name__ == '__main__':
#max_val = 0xFFFFFFFFF
max_val = 20
end_val = 10
psm = GeneratorMPProc(max_val)
rsm = [i for i in islice(psm, end_val)]
print(rsm)
打印:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
更新
您可能需要考虑使用额外参数 daemon=True
启动“生成器”进程,使其成为守护进程,然后从方法 __iter__
中完全删除 feeder_process.join()
。该代码甚至可以与您的原始 max_val
.
def __iter__(self):
queue = mp.Queue()
feeder_process = mp.Process(
target=generator,
args=(
queue,
self.max_val,
),
daemon=True
)
feeder_process.start()
msg = queue.get()
while msg != STOP_MSG:
yield msg
msg = queue.get()