如何杀死所有以线程状态为条件的线程?

How to kill all threads conditioned on status of on thread?

我同时有 n 个线程 运行。这些线程正在处理一个包含 m 个测试用例的列表。例如,线程 n-1 正在处理项目 m[i-1],而线程 n 正在处理项目 m[i]。如果例如线程 n-1 失败或 return 有信号,我想停止所有线程。我怎样才能做到这一点?

这是一个 MWE:

这是我的处理函数

def process(input_addr):
    i =+ 1
    print('Total number of executed unit tests: {}'.format(i))
    print("executed {}. thread".format(input_addr))
    try:
        command = 'python3 '+input_addr
        result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        msg, err = result.communicate()
        if msg.decode('utf-8') != '':
            stat = parse_shell(msg.decode('utf-8'))
            if stat:
                print('Test Failed')
                return True
        else:
            stat = parse_shell(err)                
            if stat:
                print('Test Failed')
                return True
    except Exception as e:
        print("thread.\nMessage:{1}".format(e))
 

这是我的游泳池:

def pre_run_test_files(self):
    with Pool(10) as p:
       p.map(process, self.test_files)

我正在使用:

from multiprocessing import Pool

我找到了解决方案:


def process(i, input_addr, event):
    kill_flag = False
    if not event.is_set():
        print('Total number of executed unit tests: {}'.format(i))
        print("executed {}. thread".format(input_addr))
        try:
            command = 'python3 '+input_addr
            result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
            msg, err = result.communicate()
            if msg.decode('utf-8') != '':
                stat = parse_shell(msg.decode('utf-8'))
                if stat:
                    print('Test Failed')
                    kill_flag = True
                    # all_run.append(input_addr)
                    #write_list_to_txt(input_addr, valid_tests)
                else:
                    kill_flag = False
            else:
                stat = parse_shell(err)                
                if stat:
                    print('Test Failed')
                    kill_flag = True
                    # all_run.append(input_addr)
                    #write_list_to_txt(input_addr, valid_tests)
                else:
                    kill_flag = False
        except Exception as e:
            print("thread.\nMessage:{1}".format(e))
    if kill_flag:
        event.set()
def manager():
     p= multiprocessing.Pool(10) 
     m = multiprocessing.Manager()
     event = m.Event()
     for i,f in enumerate(self.test_files):
         p.apply_async(process, (i, f, event))
     p.close()
     event.wait()
     p.terminate()

您可以使用辅助函数,process 只需引发异常并使用 error_callback 函数和 apply_async 调用 terminate 在池中,如以下演示所示:

from multiprocessing import Pool

def process(i):
    import time

    time.sleep(1)
    if i == 6:
        raise ValueError(f'Bad value: {i}')
    print(i, flush=True)

def my_error_callback(e):
    pool.terminate()
    print(e)

if __name__ == '__main__':
    pool = Pool(4)
    for i in range(20):
        pool.apply_async(process, args=(i,), error_callback=my_error_callback)
    # wait for all tasks to complete
    pool.close()
    pool.join()

打印:

0
1
3
2
4
5
7
Bad value: 6

您应该能够根据您的具体问题调整上述代码。

更新

因为您的原始代码使用了 map 方法,所以有第二种解决方案使用方法 imap_unordered,它将 return 是一个迭代器,在每次迭代时 return 是辅助函数的下一个 return 值,process,或者如果辅助函数引发异常,则引发异常。使用方法 imap_unordere,这些结果 returned 任意完成顺序 而不是 任务提交顺序 ,但是当使用默认的 chunksize 参数 1,此 arbitrary 顺序通常是任务完成顺序。这就是您想要的,以便您可以尽早检测到异常并终止池。当然,如果您关心来自 process 的 return 值,那么您将使用方法 imap,以便结果按任务提交顺序 returned。但是在那种情况下,如果当 case i == 6 出现异常时,但该任务恰好是第一个完成的任务,则在为 i == 1 提交的任务之前,它的异常仍然无法 returned虽然完成了 5 个。

在下面的代码中,使用了大小为 8 的池,所有任务在打印它们的参数之前首先休眠 1 秒,并且 returning except 用于这种情况of i == 6,这会立即引发异常。使用 imap_unordered 我们有:

from multiprocessing import Pool

def process(i):
    import time

    # raise an exception immediately for i == 6 without sleeping
    if (i != 6):
        time.sleep(1)
    else:
        raise ValueError(f'Bad value: {i}')
    print(i, flush=True)

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.imap_unordered(process, range(20))
    try:
        # Iterate results as task complete until
        # we are done or one raises an exeption:
        for result in results:
            # we don't care about the return value:
            pass
    except Exception as e:
        pool.terminate()
        print(e)
    pool.close()
    pool.join()

打印:

Bad value: 6

如果我们用对 imap 的调用替换对 imap_unordered 的调用,则输出为:

0
1
2
3
4
5
Bad value: 6

第一个解决方案,使用 apply_asyncerror_callback 参数,允许在异常发生时立即采取行动 如果你关心任务提交顺序的结果,你可以将apply_async编辑的multiprocessing.AsyncResult对象保存在列表中并调用get 在这些对象上。尝试将 RAISE_EXCEPTION 设置为 True 然后设置为 False 的以下代码:

from multiprocessing import Pool
import time

RAISE_EXCEPTION = True

def process(i):
    if RAISE_EXCEPTION and i == 6:
        raise ValueError(f'Bad value: {i}')
    time.sleep(1)
    return i # instead of printing

def my_error_callback(e):
    global got_exception

    got_exception = True
    pool.terminate()
    print(e)

if __name__ == '__main__':
    got_exception = False
    pool = Pool(4)
    async_results = [pool.apply_async(process, args=(i,), error_callback=my_error_callback) for i in range(20)]
    # Wait for all tasks to complete:
    pool.close()
    pool.join()
    if not got_exception:
        for async_result in async_results:
            print(async_result.get())