如何杀死所有以线程状态为条件的线程?
How to kill all threads conditioned on status of on thread?
我同时有 n
个线程 运行。这些线程正在处理一个包含 m
个测试用例的列表。例如,线程 n-1
正在处理项目 m[i-1]
,而线程 n
正在处理项目 m[i]
。如果例如线程 n-1
失败或 return 有信号,我想停止所有线程。我怎样才能做到这一点?
这是一个 MWE:
这是我的处理函数
def process(input_addr):
i =+ 1
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 '+input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
return True
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
return True
except Exception as e:
print("thread.\nMessage:{1}".format(e))
这是我的游泳池:
def pre_run_test_files(self):
with Pool(10) as p:
p.map(process, self.test_files)
我正在使用:
from multiprocessing import Pool
我找到了解决方案:
def process(i, input_addr, event):
kill_flag = False
if not event.is_set():
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 '+input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
except Exception as e:
print("thread.\nMessage:{1}".format(e))
if kill_flag:
event.set()
def manager():
p= multiprocessing.Pool(10)
m = multiprocessing.Manager()
event = m.Event()
for i,f in enumerate(self.test_files):
p.apply_async(process, (i, f, event))
p.close()
event.wait()
p.terminate()
您可以使用辅助函数,process
只需引发异常并使用 error_callback 函数和 apply_async
调用 terminate
在池中,如以下演示所示:
from multiprocessing import Pool
def process(i):
import time
time.sleep(1)
if i == 6:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
def my_error_callback(e):
pool.terminate()
print(e)
if __name__ == '__main__':
pool = Pool(4)
for i in range(20):
pool.apply_async(process, args=(i,), error_callback=my_error_callback)
# wait for all tasks to complete
pool.close()
pool.join()
打印:
0
1
3
2
4
5
7
Bad value: 6
您应该能够根据您的具体问题调整上述代码。
更新
因为您的原始代码使用了 map
方法,所以有第二种解决方案使用方法 imap_unordered
,它将 return 是一个迭代器,在每次迭代时 return 是辅助函数的下一个 return 值,process
,或者如果辅助函数引发异常,则引发异常。使用方法 imap_unordere
,这些结果 returned 任意完成顺序 而不是 任务提交顺序 ,但是当使用默认的 chunksize 参数 1,此 arbitrary 顺序通常是任务完成顺序。这就是您想要的,以便您可以尽早检测到异常并终止池。当然,如果您关心来自 process
的 return 值,那么您将使用方法 imap
,以便结果按任务提交顺序 returned。但是在那种情况下,如果当 case i == 6 出现异常时,但该任务恰好是第一个完成的任务,则在为 i == 1 提交的任务之前,它的异常仍然无法 returned虽然完成了 5 个。
在下面的代码中,使用了大小为 8 的池,所有任务在打印它们的参数之前首先休眠 1 秒,并且 returning except 用于这种情况of i == 6,这会立即引发异常。使用 imap_unordered
我们有:
from multiprocessing import Pool
def process(i):
import time
# raise an exception immediately for i == 6 without sleeping
if (i != 6):
time.sleep(1)
else:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
if __name__ == '__main__':
pool = Pool(8)
results = pool.imap_unordered(process, range(20))
try:
# Iterate results as task complete until
# we are done or one raises an exeption:
for result in results:
# we don't care about the return value:
pass
except Exception as e:
pool.terminate()
print(e)
pool.close()
pool.join()
打印:
Bad value: 6
如果我们用对 imap
的调用替换对 imap_unordered
的调用,则输出为:
0
1
2
3
4
5
Bad value: 6
第一个解决方案,使用 apply_async
和 error_callback 参数,允许在异常发生时立即采取行动 和如果你关心任务提交顺序的结果,你可以将apply_async
编辑的multiprocessing.AsyncResult
对象保存在列表中并调用get
在这些对象上。尝试将 RAISE_EXCEPTION
设置为 True
然后设置为 False
的以下代码:
from multiprocessing import Pool
import time
RAISE_EXCEPTION = True
def process(i):
if RAISE_EXCEPTION and i == 6:
raise ValueError(f'Bad value: {i}')
time.sleep(1)
return i # instead of printing
def my_error_callback(e):
global got_exception
got_exception = True
pool.terminate()
print(e)
if __name__ == '__main__':
got_exception = False
pool = Pool(4)
async_results = [pool.apply_async(process, args=(i,), error_callback=my_error_callback) for i in range(20)]
# Wait for all tasks to complete:
pool.close()
pool.join()
if not got_exception:
for async_result in async_results:
print(async_result.get())
我同时有 n
个线程 运行。这些线程正在处理一个包含 m
个测试用例的列表。例如,线程 n-1
正在处理项目 m[i-1]
,而线程 n
正在处理项目 m[i]
。如果例如线程 n-1
失败或 return 有信号,我想停止所有线程。我怎样才能做到这一点?
这是一个 MWE:
这是我的处理函数
def process(input_addr):
i =+ 1
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 '+input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
return True
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
return True
except Exception as e:
print("thread.\nMessage:{1}".format(e))
这是我的游泳池:
def pre_run_test_files(self):
with Pool(10) as p:
p.map(process, self.test_files)
我正在使用:
from multiprocessing import Pool
我找到了解决方案:
def process(i, input_addr, event):
kill_flag = False
if not event.is_set():
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 '+input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
except Exception as e:
print("thread.\nMessage:{1}".format(e))
if kill_flag:
event.set()
def manager():
p= multiprocessing.Pool(10)
m = multiprocessing.Manager()
event = m.Event()
for i,f in enumerate(self.test_files):
p.apply_async(process, (i, f, event))
p.close()
event.wait()
p.terminate()
您可以使用辅助函数,process
只需引发异常并使用 error_callback 函数和 apply_async
调用 terminate
在池中,如以下演示所示:
from multiprocessing import Pool
def process(i):
import time
time.sleep(1)
if i == 6:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
def my_error_callback(e):
pool.terminate()
print(e)
if __name__ == '__main__':
pool = Pool(4)
for i in range(20):
pool.apply_async(process, args=(i,), error_callback=my_error_callback)
# wait for all tasks to complete
pool.close()
pool.join()
打印:
0
1
3
2
4
5
7
Bad value: 6
您应该能够根据您的具体问题调整上述代码。
更新
因为您的原始代码使用了 map
方法,所以有第二种解决方案使用方法 imap_unordered
,它将 return 是一个迭代器,在每次迭代时 return 是辅助函数的下一个 return 值,process
,或者如果辅助函数引发异常,则引发异常。使用方法 imap_unordere
,这些结果 returned 任意完成顺序 而不是 任务提交顺序 ,但是当使用默认的 chunksize 参数 1,此 arbitrary 顺序通常是任务完成顺序。这就是您想要的,以便您可以尽早检测到异常并终止池。当然,如果您关心来自 process
的 return 值,那么您将使用方法 imap
,以便结果按任务提交顺序 returned。但是在那种情况下,如果当 case i == 6 出现异常时,但该任务恰好是第一个完成的任务,则在为 i == 1 提交的任务之前,它的异常仍然无法 returned虽然完成了 5 个。
在下面的代码中,使用了大小为 8 的池,所有任务在打印它们的参数之前首先休眠 1 秒,并且 returning except 用于这种情况of i == 6,这会立即引发异常。使用 imap_unordered
我们有:
from multiprocessing import Pool
def process(i):
import time
# raise an exception immediately for i == 6 without sleeping
if (i != 6):
time.sleep(1)
else:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
if __name__ == '__main__':
pool = Pool(8)
results = pool.imap_unordered(process, range(20))
try:
# Iterate results as task complete until
# we are done or one raises an exeption:
for result in results:
# we don't care about the return value:
pass
except Exception as e:
pool.terminate()
print(e)
pool.close()
pool.join()
打印:
Bad value: 6
如果我们用对 imap
的调用替换对 imap_unordered
的调用,则输出为:
0
1
2
3
4
5
Bad value: 6
第一个解决方案,使用 apply_async
和 error_callback 参数,允许在异常发生时立即采取行动 和如果你关心任务提交顺序的结果,你可以将apply_async
编辑的multiprocessing.AsyncResult
对象保存在列表中并调用get
在这些对象上。尝试将 RAISE_EXCEPTION
设置为 True
然后设置为 False
的以下代码:
from multiprocessing import Pool
import time
RAISE_EXCEPTION = True
def process(i):
if RAISE_EXCEPTION and i == 6:
raise ValueError(f'Bad value: {i}')
time.sleep(1)
return i # instead of printing
def my_error_callback(e):
global got_exception
got_exception = True
pool.terminate()
print(e)
if __name__ == '__main__':
got_exception = False
pool = Pool(4)
async_results = [pool.apply_async(process, args=(i,), error_callback=my_error_callback) for i in range(20)]
# Wait for all tasks to complete:
pool.close()
pool.join()
if not got_exception:
for async_result in async_results:
print(async_result.get())