Python: 根据异常类型终止所有活动进程

Python: terminate all alive processes based on an exception type

使用多处理进程对象,当其中一个进程发生异常时,我们如何终止所有活动进程?

from multiprocessing import Process
from time import sleep

def func(i):
   print 'begin', i
   sleep(i)
   if i == 3:
      raise Exception
   print 'end', i

def terminate_all(procs):
   for p in procs:
      if p.is_alive():
         p.terminate()

processes = []
for i in range(0,10):
    p = Process(target=func, args=(i,))
    processes.append(p)
    p.start() # where do I catch the Exception and call terminate_all?


for p in processes:
    p.join()

根据链接问题中的@abranches 回答,我能够在发生异常时终止进程。使用 Pipe 来包装异常处理。这是代码:

import multiprocessing
import threading
from time import sleep

def run(i, conn):
    try:
        f(i)
    except CustomException as e:
        conn.send(e)

def f(i):
   print i, ' called'
   sleep(i)
   if i == 11:
       raise CustomException
   print i, ' done'

class CustomException(Exception):
    pass

class ProcessManager(object):
    def __init__(self):
        self.processes = []
        self._threads = []
        self._lock = threading.Lock()

    def terminate_all(self):
        with self._lock:
            for p in self.processes:
                if p.is_alive():
                    print "Terminating %s" % p
                    p.terminate()

    def launch_proc(self, func, args=(), kwargs= {}):
        t = threading.Thread(target=self._proc_thread_runner,
                             args=(func, args, kwargs))
        self._threads.append(t)
        t.start()

    def _proc_thread_runner(self, func, args, kwargs):
        parent_conn, child_conn = multiprocessing.Pipe()
        args = args + tuple([child_conn])
        p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
        self.processes.append(p)
        p.start()
        while p.exitcode is None:
            p.join()
        if parent_conn.poll():
            obj = parent_conn.recv()
            if isinstance(obj, CustomException):
                self.terminate_all()

    def wait(self):
        for t in self._threads:
            t.join()

if __name__ == '__main__':
    proc_manager = ProcessManager()
    for i in range(0, 20):
        proc_manager.launch_proc(run, (i,))
    proc_manager.wait()