Python 多处理 imap - 丢弃超时进程
Python multiprocessing imap - discard timeout processes
使用Python多处理我想捕获进程丢弃它们并继续下一个进程。
在下面的例子中,我有一个 1 和 0 的列表作为输入。 0 将启动睡眠功能以触发超时错误。触发超时的进程将重新执行,因此脚本将永远运行。
如何捕获超时错误、终止导致该错误的进程并防止该进程重新执行?我可以使用 imap 执行此操作很重要。
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs)
idx = 0
for s in (inputs):
try:
res = futures_res.next(timeout=0.1)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
# Catch time out error
# I want this to also prevent the process from being executed again
# solutions.append(0.0)
# Should print 4
print(len(solutions))
print(solutions)
您可能对 imap
如何处理超时感到有些困惑,或者您没有清楚地表达您的问题,或者我感到困惑。所以让我们从头开始:
为了确定当您对 imap
编辑的迭代器 return 执行 next(timeout=some_value)
时是否会抛出 multiprocessing.TimeoutError
异常,计时开始于任务被进程从队列中取出来执行。因此,如果池中只有一个进程并提交了 6 个任务,则不会执行并行处理,例如,第三个任务将在第二个任务完成之前不会开始,也就是第三个任务的计时开始的时间而不是从提交所有任务开始。
但是当您遇到超时异常时,正在执行的任务实际上没有任何反应——它会继续执行。您仅从 imap
迭代 return 值 6 次。但是,如果您无限期地迭代直到出现 StopIteration
异常,您最终会看到所有任务最终都已完成并 returned 了一个值,可能会在此过程中抛出多个超时错误。
一个解决方案是继续从 inputs
列表中删除与您正在迭代其结果的任务对应的输入值,但是一旦出现超时异常,您将终止池中的剩余任务(如果有)如果 inputs
列表中仍有任何输入,请使用新的 inputs
列表重新运行 imap
。
三点:当你终止池时,池中的进程可能已经开始执行输入队列上的下一个任务。所以这需要是一个可重新启动的任务。您还需要将输入列表的副本传递给 imap
,因为 imap
“懒惰地”评估 pasaed 可迭代对象,并且您将在迭代 [=] 时修改 inputs
列表如果您没有通过副本,imap
和 imap
中的 40=] 值仍然会评估 inputs
。您应该传递比 .1 稍大的超时值,因为在我的桌面上,即使将值 1 传递给辅助函数时,我仍然时不时遇到超时异常。
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
while inputs:
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs.copy())
while inputs:
s = inputs.pop(0)
try:
res = futures_res.next(timeout=.5)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
break
# Should print 4
print(len(solutions))
print(solutions)
打印:
1
1
0
0 err
1
1
0
0 err
4
[1, 1, 1, 1]
使用Python多处理我想捕获进程丢弃它们并继续下一个进程。
在下面的例子中,我有一个 1 和 0 的列表作为输入。 0 将启动睡眠功能以触发超时错误。触发超时的进程将重新执行,因此脚本将永远运行。
如何捕获超时错误、终止导致该错误的进程并防止该进程重新执行?我可以使用 imap 执行此操作很重要。
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs)
idx = 0
for s in (inputs):
try:
res = futures_res.next(timeout=0.1)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
# Catch time out error
# I want this to also prevent the process from being executed again
# solutions.append(0.0)
# Should print 4
print(len(solutions))
print(solutions)
您可能对 imap
如何处理超时感到有些困惑,或者您没有清楚地表达您的问题,或者我感到困惑。所以让我们从头开始:
为了确定当您对 imap
编辑的迭代器 return 执行 next(timeout=some_value)
时是否会抛出 multiprocessing.TimeoutError
异常,计时开始于任务被进程从队列中取出来执行。因此,如果池中只有一个进程并提交了 6 个任务,则不会执行并行处理,例如,第三个任务将在第二个任务完成之前不会开始,也就是第三个任务的计时开始的时间而不是从提交所有任务开始。
但是当您遇到超时异常时,正在执行的任务实际上没有任何反应——它会继续执行。您仅从 imap
迭代 return 值 6 次。但是,如果您无限期地迭代直到出现 StopIteration
异常,您最终会看到所有任务最终都已完成并 returned 了一个值,可能会在此过程中抛出多个超时错误。
一个解决方案是继续从 inputs
列表中删除与您正在迭代其结果的任务对应的输入值,但是一旦出现超时异常,您将终止池中的剩余任务(如果有)如果 inputs
列表中仍有任何输入,请使用新的 inputs
列表重新运行 imap
。
三点:当你终止池时,池中的进程可能已经开始执行输入队列上的下一个任务。所以这需要是一个可重新启动的任务。您还需要将输入列表的副本传递给 imap
,因为 imap
“懒惰地”评估 pasaed 可迭代对象,并且您将在迭代 [=] 时修改 inputs
列表如果您没有通过副本,imap
和 imap
中的 40=] 值仍然会评估 inputs
。您应该传递比 .1 稍大的超时值,因为在我的桌面上,即使将值 1 传递给辅助函数时,我仍然时不时遇到超时异常。
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
while inputs:
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs.copy())
while inputs:
s = inputs.pop(0)
try:
res = futures_res.next(timeout=.5)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
break
# Should print 4
print(len(solutions))
print(solutions)
打印:
1
1
0
0 err
1
1
0
0 err
4
[1, 1, 1, 1]