在多处理队列上使用 islice 时阻止脚本挂起

Block script from hanging when islice is used on a multiprocessing Queue

我有一个可迭代的 Python class,它环绕着一个多处理生成器。有些用例只需要生成的子集,因此它被包装在 islice 中。

然而,调用在使用 islice 时挂起,我猜是由于底层多处理进程没有意识到事情已经结束。

一个最低限度的功能示例如下:

from itertools import islice
import multiprocessing as mp

STOP_MSG = 'STOP!'

def generator(queue, max_val):
  for i in range(max_val):
    queue.put(i)
  queue.put(STOP_MSG)

class GeneratorMPProc:
  def __init__(self, max_val):
    self.max_val = max_val

  def __iter__(self):
    queue = mp.Queue()
    feeder_process = mp.Process(
      target=generator,
      args=(
        queue,
        self.max_val,
      ))
    feeder_process.start()
    msg = queue.get()
    while msg != STOP_MSG:
      yield msg
      msg = queue.get()
    feeder_process.join()

if __name__ == '__main__':
  max_val = 0xFFFFFFFFF
  end_val = 10

  psm = GeneratorMPProc(max_val)
  rsm = [i for i in islice(psm, end_val)]

如何解决此问题,以便即使在使用 islice 或任何子集选择器时它也能正确终止?

您的 isllice 呼叫不会转到 return 直到 GeneratorMPProc.iter return 秒,并且 max_val 设置为 0xFFFFFFFFF(写入队列不是最快的操作,这也会占用一些资源)。换句话说,“事情 不会 结束”,直到你的生成器函数结束,因此你的 multiprocess.Process 实际上结束并且可以加入。

max_val 设置为一个值,例如 20,您的程序将很容易终止。


if __name__ == '__main__':
    #max_val = 0xFFFFFFFFF
    max_val = 20
    end_val = 10

    psm = GeneratorMPProc(max_val)
    rsm = [i for i in islice(psm, end_val)]
    print(rsm)

打印:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

更新

您可能需要考虑使用额外参数 daemon=True 启动“生成器”进程,使其成为守护进程,然后从方法 __iter__ 中完全删除 feeder_process.join()。该代码甚至可以与您的原始 max_val.

一起使用
  def __iter__(self):
    queue = mp.Queue()
    feeder_process = mp.Process(
      target=generator,
      args=(
        queue,
        self.max_val,
      ),
      daemon=True
    )
    feeder_process.start()
    msg = queue.get()
    while msg != STOP_MSG:
      yield msg
      msg = queue.get()