Python 多处理:超过超时后通过参数终止进程
Python multiprocessing: terminate processes by arguments after exceeding timeout
我想针对 k
和 n
的不同值计算长 运行 函数的 运行 时间。
例如:k = [2,3,...,100]
和 n = [50,100,150,200,...,1000]
如果特定 (k, n)
元组的 运行 时间超过特定时间量(例如,60 seconds
),我想停止每个进程的执行n
(并将运行时间设置为inf
)。
例如,如果 n = 500
超时,我想用 n >= 500
.
取消所有任务的执行
我尝试使用 Python 的 multiprocessing.Pool
和 concurrent.futures.ProcessPoolExecutor
,但我找不到取消 运行 任务的方法。据我所知,在 运行.
时无法取消它
我想也许我应该以不同的方式解决这个问题。
请指教
import multiprocessing
import random
import time
from datetime import datetime
from itertools import product
from multiprocessing import Pool
n_list = [n * 50 for n in range(1, 21)]
k_list = [k for k in range(2, 31)]
k_n_list = list(product(k_list, n_list))
def long_running_function(k, n):
start_time = datetime.now()
time.sleep(random.randint(2,120))
end_time = datetime.now()
running_time = end_time - start_time
return k, n, running_time.total_seconds()
running_times = []
with Pool(processes=multiprocessing.cpu_count()) as pool:
async_results = []
for k, n in k_n_list:
async_results.append((k, n, pool.apply_async(func=long_running_function, args=(k, n))))
for k, n, result in async_results:
try:
process_result = result.get(60) # timeout after 60 seconds
running_times.append(process_result)
except multiprocessing.TimeoutError:
print(f"Timeout for k = {k}, n = {n}")
running_times.append((k, n, float('inf')))
# HERE I WOULD LIKE TO CANCEL EVERY TASK WITH N >= n
您应该注意的第一件事是以下语句...
process_result = result.get(60) # timeout after 60 seconds
... 如果与 result
关联的任务尚未完成,将引发 multiprocessing.TimeoutError
, 但它不会终止任务;任务继续 运行。但是,当 pool.terminate()
被调用时,在您退出 with Pool ... as pool:
块时隐式地调用或者如果显式调用,则池中的所有进程(当然还有它们当前的任务 运行ning) 将被终止。但是甚至不要考虑使用由 concurrent.futures
创建的进程池;没有任何方法可以在所有任务完成之前终止进程。
其次,您在大小为 os.cpu_count()
的进程池中 运行 宁 k * n
个任务,其中任务的数量可能远远大于池中进程的数量你有。因此,有可能当你发现自己的一个任务在60秒内还没有完成时,还有很多任务甚至还没有开始运行。这总是会有问题,因为您将给所有具有某些 n
值的任务完成 60 秒,但许多任务甚至没有机会开始就会被终止。
第三,在你执行的循环中......
process_result = result.get(60)
您正在对其进行测试的 AsynchResult
实例可能 return 在 3 秒(而不是超时)后得到结果。 但是自您提交任务以来已经过去了 3 秒。 在下一次迭代中,您现在只想等待 57 秒以获得下一个结果!
一个可能的解决方案是使用存储在所有进程共享内存中的 multiprocessing.Value
实例,因此对初始化为 sys.maxsize
的所有任务可见。您的工作函数必须定期检查此 Value
的值,如果小于或等于它们正在处理的 n
的值,则这是工作函数立即正常 return 的信号。因此,代码变成类似下面的代码(请注意,为了演示目的,我更改了一些参数):
import multiprocessing
import random
import time
from datetime import datetime
from itertools import product
from multiprocessing import Pool, Value
import sys
import ctypes
def init_pool(v):
global stop_n
stop_n = v
def long_running_function(k, n):
print('n =', n)
start_time = datetime.now()
#sleep_time = random.randint(2, 10)
sleep_time = n / 100 + .3
t_stop = time.time() + sleep_time
while time.time() < t_stop:
if n >= stop_n.value:
print('quitting because my n is', n)
break
time.sleep(.1)
end_time = datetime.now()
running_time = end_time - start_time
return k, n, running_time.total_seconds()
# required for Windows:
if __name__ == '__main__':
n_list = [n * 50 for n in range(1, 20)]
k_list = [k for k in range(2, 3)]
k_n_list = list(product(k_list, n_list))
running_times = []
stop_n = Value(ctypes.c_ulonglong, sys.maxsize)
# best to leave one processor free for main process
with Pool(processes=multiprocessing.cpu_count() - 1, initializer=init_pool, initargs=(stop_n,)) as pool:
async_results = []
for k, n in k_n_list:
async_results.append((k, n, pool.apply_async(func=long_running_function, args=(k, n))))
TIMEOUT = 4 # timeout after 4 seconds
start_time = time.time()
for k, n, result in async_results:
try:
time_to_wait = TIMEOUT - (time.time() - start_time)
if time_to_wait < 0:
time_to_wait = 0
process_result = result.get(time_to_wait)
except multiprocessing.TimeoutError:
# signal to tasks whose n argument is >= than this value of n:
print('setting stop value to', n)
stop_n.value = n
break
# now process actual results:
for k, n, result in async_results:
process_result = result.get()
running_times.append(process_result)
print(running_times)
打印:
n = 50
n = 100
n = 150
n = 200
n = 250
n = 300
n = 350
n = 400
n = 450
n = 500
n = 550
n = 600
n = 650
n = 700
setting stop value to 400
quitting because my n is 400
n = 750
quitting because my n is 750
n = 800
quitting because my n is 800
quitting because my n is 500
quitting because my n is 450
n = 850
n = 900
n = 950
quitting because my n is 850
quitting because my n is 900
quitting because my n is 950
quitting because my n is 550
quitting because my n is 600
quitting because my n is 650
quitting because my n is 700
[(2, 50, 0.803502), (2, 100, 1.306462), (2, 150, 1.807341), (2, 200, 2.308982), (2, 250, 2.812402), (2, 300, 3.315068), (2, 350, 3.81634), (2, 400, 3.114924), (2, 450, 2.627066), (2, 500, 2.124075), (2, 550, 1.607504), (2, 600, 1.104059), (2, 650, 0.604383), (2, 700, 0.100104), (2, 750, 0.001005), (2, 800, 0.000999), (2, 850, 0.002), (2, 900, 0.001999), (2, 950, 0.001999)]
您会发现在我的 8 核桌面上,其中 7 个已分配到池中,在共享 Value
设置为 [=30= 时,有几个任务正在等待启动] 这样当它们开始时它们会立即终止(您可以看到它们的 运行ning 时间非常小)。正如我所说,您尝试以这种方式进行的操作是有问题的。 最好在 Value
设置值为 n
后,每个适用于此的任务而不是立即 returning,给自己一个一定的秒数才能完成。
更新
如果您希望已经开始主进程的任务无论如何都完成(因为它们无法检查stop_n
),请将long_range_function
更改为:
def long_running_function(k, n):
start_time = datetime.now()
print('n =', n)
if n < stop_n.value:
#time.sleep(random.randint(2, 10))
time.sleep(n / 100 + .3)
else:
print('quitting because my n is', n)
end_time = datetime.now()
running_time = end_time - start_time
return k, n, running_time.total_seconds()
它现在打印:
n = 50
n = 100
n = 150
n = 200
n = 250
n = 300
n = 350
n = 400
n = 450
n = 500
n = 550
n = 600
n = 650
n = 700
setting stop value to 400
n = 750
quitting because my n is 750
n = 800
quitting because my n is 800
n = 850
quitting because my n is 850
n = 900
quitting because my n is 900
n = 950
quitting because my n is 950
[(2, 50, 0.801908), (2, 100, 1.300968), (2, 150, 1.800735), (2, 200, 2.301075), (2, 250, 2.800968), (2, 300, 3.301077), (2, 350, 3.800717), (2, 400, 4.301718), (2, 450, 4.801664), (2, 500, 5.301043), (2, 550, 5.800506), (2, 600, 6.300665), (2, 650, 6.800603), (2, 700, 7.301471), (2, 750, 0.0), (2, 800, 0.0), (2, 850, 0.0), (2, 900, 0.0), (2, 950, 0.001015)]
我想针对 k
和 n
的不同值计算长 运行 函数的 运行 时间。
例如:k = [2,3,...,100]
和 n = [50,100,150,200,...,1000]
如果特定 (k, n)
元组的 运行 时间超过特定时间量(例如,60 seconds
),我想停止每个进程的执行n
(并将运行时间设置为inf
)。
例如,如果 n = 500
超时,我想用 n >= 500
.
我尝试使用 Python 的 multiprocessing.Pool
和 concurrent.futures.ProcessPoolExecutor
,但我找不到取消 运行 任务的方法。据我所知,在 运行.
我想也许我应该以不同的方式解决这个问题。
请指教
import multiprocessing
import random
import time
from datetime import datetime
from itertools import product
from multiprocessing import Pool
n_list = [n * 50 for n in range(1, 21)]
k_list = [k for k in range(2, 31)]
k_n_list = list(product(k_list, n_list))
def long_running_function(k, n):
start_time = datetime.now()
time.sleep(random.randint(2,120))
end_time = datetime.now()
running_time = end_time - start_time
return k, n, running_time.total_seconds()
running_times = []
with Pool(processes=multiprocessing.cpu_count()) as pool:
async_results = []
for k, n in k_n_list:
async_results.append((k, n, pool.apply_async(func=long_running_function, args=(k, n))))
for k, n, result in async_results:
try:
process_result = result.get(60) # timeout after 60 seconds
running_times.append(process_result)
except multiprocessing.TimeoutError:
print(f"Timeout for k = {k}, n = {n}")
running_times.append((k, n, float('inf')))
# HERE I WOULD LIKE TO CANCEL EVERY TASK WITH N >= n
您应该注意的第一件事是以下语句...
process_result = result.get(60) # timeout after 60 seconds
... 如果与 result
关联的任务尚未完成,将引发 multiprocessing.TimeoutError
, 但它不会终止任务;任务继续 运行。但是,当 pool.terminate()
被调用时,在您退出 with Pool ... as pool:
块时隐式地调用或者如果显式调用,则池中的所有进程(当然还有它们当前的任务 运行ning) 将被终止。但是甚至不要考虑使用由 concurrent.futures
创建的进程池;没有任何方法可以在所有任务完成之前终止进程。
其次,您在大小为 os.cpu_count()
的进程池中 运行 宁 k * n
个任务,其中任务的数量可能远远大于池中进程的数量你有。因此,有可能当你发现自己的一个任务在60秒内还没有完成时,还有很多任务甚至还没有开始运行。这总是会有问题,因为您将给所有具有某些 n
值的任务完成 60 秒,但许多任务甚至没有机会开始就会被终止。
第三,在你执行的循环中......
process_result = result.get(60)
您正在对其进行测试的 AsynchResult
实例可能 return 在 3 秒(而不是超时)后得到结果。 但是自您提交任务以来已经过去了 3 秒。 在下一次迭代中,您现在只想等待 57 秒以获得下一个结果!
一个可能的解决方案是使用存储在所有进程共享内存中的 multiprocessing.Value
实例,因此对初始化为 sys.maxsize
的所有任务可见。您的工作函数必须定期检查此 Value
的值,如果小于或等于它们正在处理的 n
的值,则这是工作函数立即正常 return 的信号。因此,代码变成类似下面的代码(请注意,为了演示目的,我更改了一些参数):
import multiprocessing
import random
import time
from datetime import datetime
from itertools import product
from multiprocessing import Pool, Value
import sys
import ctypes
def init_pool(v):
global stop_n
stop_n = v
def long_running_function(k, n):
print('n =', n)
start_time = datetime.now()
#sleep_time = random.randint(2, 10)
sleep_time = n / 100 + .3
t_stop = time.time() + sleep_time
while time.time() < t_stop:
if n >= stop_n.value:
print('quitting because my n is', n)
break
time.sleep(.1)
end_time = datetime.now()
running_time = end_time - start_time
return k, n, running_time.total_seconds()
# required for Windows:
if __name__ == '__main__':
n_list = [n * 50 for n in range(1, 20)]
k_list = [k for k in range(2, 3)]
k_n_list = list(product(k_list, n_list))
running_times = []
stop_n = Value(ctypes.c_ulonglong, sys.maxsize)
# best to leave one processor free for main process
with Pool(processes=multiprocessing.cpu_count() - 1, initializer=init_pool, initargs=(stop_n,)) as pool:
async_results = []
for k, n in k_n_list:
async_results.append((k, n, pool.apply_async(func=long_running_function, args=(k, n))))
TIMEOUT = 4 # timeout after 4 seconds
start_time = time.time()
for k, n, result in async_results:
try:
time_to_wait = TIMEOUT - (time.time() - start_time)
if time_to_wait < 0:
time_to_wait = 0
process_result = result.get(time_to_wait)
except multiprocessing.TimeoutError:
# signal to tasks whose n argument is >= than this value of n:
print('setting stop value to', n)
stop_n.value = n
break
# now process actual results:
for k, n, result in async_results:
process_result = result.get()
running_times.append(process_result)
print(running_times)
打印:
n = 50
n = 100
n = 150
n = 200
n = 250
n = 300
n = 350
n = 400
n = 450
n = 500
n = 550
n = 600
n = 650
n = 700
setting stop value to 400
quitting because my n is 400
n = 750
quitting because my n is 750
n = 800
quitting because my n is 800
quitting because my n is 500
quitting because my n is 450
n = 850
n = 900
n = 950
quitting because my n is 850
quitting because my n is 900
quitting because my n is 950
quitting because my n is 550
quitting because my n is 600
quitting because my n is 650
quitting because my n is 700
[(2, 50, 0.803502), (2, 100, 1.306462), (2, 150, 1.807341), (2, 200, 2.308982), (2, 250, 2.812402), (2, 300, 3.315068), (2, 350, 3.81634), (2, 400, 3.114924), (2, 450, 2.627066), (2, 500, 2.124075), (2, 550, 1.607504), (2, 600, 1.104059), (2, 650, 0.604383), (2, 700, 0.100104), (2, 750, 0.001005), (2, 800, 0.000999), (2, 850, 0.002), (2, 900, 0.001999), (2, 950, 0.001999)]
您会发现在我的 8 核桌面上,其中 7 个已分配到池中,在共享 Value
设置为 [=30= 时,有几个任务正在等待启动] 这样当它们开始时它们会立即终止(您可以看到它们的 运行ning 时间非常小)。正如我所说,您尝试以这种方式进行的操作是有问题的。 最好在 Value
设置值为 n
后,每个适用于此的任务而不是立即 returning,给自己一个一定的秒数才能完成。
更新
如果您希望已经开始主进程的任务无论如何都完成(因为它们无法检查stop_n
),请将long_range_function
更改为:
def long_running_function(k, n):
start_time = datetime.now()
print('n =', n)
if n < stop_n.value:
#time.sleep(random.randint(2, 10))
time.sleep(n / 100 + .3)
else:
print('quitting because my n is', n)
end_time = datetime.now()
running_time = end_time - start_time
return k, n, running_time.total_seconds()
它现在打印:
n = 50
n = 100
n = 150
n = 200
n = 250
n = 300
n = 350
n = 400
n = 450
n = 500
n = 550
n = 600
n = 650
n = 700
setting stop value to 400
n = 750
quitting because my n is 750
n = 800
quitting because my n is 800
n = 850
quitting because my n is 850
n = 900
quitting because my n is 900
n = 950
quitting because my n is 950
[(2, 50, 0.801908), (2, 100, 1.300968), (2, 150, 1.800735), (2, 200, 2.301075), (2, 250, 2.800968), (2, 300, 3.301077), (2, 350, 3.800717), (2, 400, 4.301718), (2, 450, 4.801664), (2, 500, 5.301043), (2, 550, 5.800506), (2, 600, 6.300665), (2, 650, 6.800603), (2, 700, 7.301471), (2, 750, 0.0), (2, 800, 0.0), (2, 850, 0.0), (2, 900, 0.0), (2, 950, 0.001015)]