Python 多处理 imap - 丢弃超时进程

Python multiprocessing imap - discard timeout processes

使用Python多处理我想捕获进程丢弃它们并继续下一个进程。

在下面的例子中,我有一个 1 和 0 的列表作为输入。 0 将启动睡眠功能以触发超时错误。触发超时的进程将重新执行,因此脚本将永远运行。

如何捕获超时错误、终止导致该错误的进程并防止该进程重新执行?我可以使用 imap 执行此操作很重要。

import time
import multiprocessing as mp

def a_func(x):
    print(x)
    if x:
        return x
    
    # Function sleeps before returning
    # to trigger timeout error
    else:
        time.sleep(2.0)
        return x


if __name__ == "__main__":
    solutions = []

    # Inputs sum to 4
    inputs = [1, 1, 0, 1, 1, 0]

    with mp.get_context("spawn").Pool(1) as pool:
        futures_res = pool.imap(a_func, inputs)
        idx = 0
        for s in (inputs):
            try:
                res = futures_res.next(timeout=0.1)
                # If successful (no time out), append the result
                solutions.append(res)
            
            except mp.context.TimeoutError:
                print(s, "err")
                # Catch time out error
                # I want this to also prevent the process from being executed again
                # solutions.append(0.0)

    # Should print 4
    print(len(solutions))
    print(solutions)

您可能对 imap 如何处理超时感到有些困惑,或者您没有清楚地表达您的问题,或者我感到困惑。所以让我们从头开始:

为了确定当您对 imap 编辑的迭代器 return 执行 next(timeout=some_value) 时是否会抛出 multiprocessing.TimeoutError 异常,计时开始于任务被进程从队列中取出来执行。因此,如果池中只有一个进程并提交了 6 个任务,则不会执行并行处理,例如,第三个任务将在第二个任务完成之前不会开始,也就是第三个任务的计时开始的时间而不是从提交所有任务开始。

但是当您遇到超时异常时,正在执行的任务实际上没有任何反应——它会继续执行。您仅从 imap 迭代 return 值 6 次。但是,如果您无限期地迭代直到出现 StopIteration 异常,您最终会看到所有任务最终都已完成并 returned 了一个值,可能会在此过程中抛出多个超时错误。

一个解决方案是继续从 inputs 列表中删除与您正在迭代其结果的任务对应的输入值,但是一旦出现超时异常,您将终止池中的剩余任务(如果有)如果 inputs 列表中仍有任何输入,请使用新的 inputs 列表重新运行 imap

三点:当你终止池时,池中的进程可能已经开始执行输入队列上的下一个任务。所以这需要是一个可重新启动的任务。您还需要将输入列表的副本传递给 imap,因为 imap“懒惰地”评估 pasaed 可迭代对象,并且您将在迭代 [=] 时修改 inputs 列表如果您没有通过副本,imapimap 中的 40=] 值仍然会评估 inputs。您应该传递比 .1 稍大的超时值,因为在我的桌面上,即使将值 1 传递给辅助函数时,我仍然时不时遇到超时异常。

import time
import multiprocessing as mp

def a_func(x):
    print(x)
    if x:
        return x

    # Function sleeps before returning
    # to trigger timeout error
    else:
        time.sleep(2.0)
        return x


if __name__ == "__main__":
    solutions = []

    # Inputs sum to 4
    inputs = [1, 1, 0, 1, 1, 0]

    while inputs:
        with mp.get_context("spawn").Pool(1) as pool:
            futures_res = pool.imap(a_func, inputs.copy())
            while inputs:
                s = inputs.pop(0)
                try:
                    res = futures_res.next(timeout=.5)
                    # If successful (no time out), append the result
                    solutions.append(res)
                except mp.context.TimeoutError:
                    print(s, "err")
                    break

    # Should print 4
    print(len(solutions))
    print(solutions)

打印:

1
1
0
0 err
1
1
0
0 err
4
[1, 1, 1, 1]