Python: 根据异常类型终止所有活动进程
Python: terminate all alive processes based on an exception type
使用多处理进程对象,当其中一个进程发生异常时,我们如何终止所有活动进程?
from multiprocessing import Process
from time import sleep
def func(i):
print 'begin', i
sleep(i)
if i == 3:
raise Exception
print 'end', i
def terminate_all(procs):
for p in procs:
if p.is_alive():
p.terminate()
processes = []
for i in range(0,10):
p = Process(target=func, args=(i,))
processes.append(p)
p.start() # where do I catch the Exception and call terminate_all?
for p in processes:
p.join()
根据链接问题中的@abranches 回答,我能够在发生异常时终止进程。使用 Pipe 来包装异常处理。这是代码:
import multiprocessing
import threading
from time import sleep
def run(i, conn):
try:
f(i)
except CustomException as e:
conn.send(e)
def f(i):
print i, ' called'
sleep(i)
if i == 11:
raise CustomException
print i, ' done'
class CustomException(Exception):
pass
class ProcessManager(object):
def __init__(self):
self.processes = []
self._threads = []
self._lock = threading.Lock()
def terminate_all(self):
with self._lock:
for p in self.processes:
if p.is_alive():
print "Terminating %s" % p
p.terminate()
def launch_proc(self, func, args=(), kwargs= {}):
t = threading.Thread(target=self._proc_thread_runner,
args=(func, args, kwargs))
self._threads.append(t)
t.start()
def _proc_thread_runner(self, func, args, kwargs):
parent_conn, child_conn = multiprocessing.Pipe()
args = args + tuple([child_conn])
p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
self.processes.append(p)
p.start()
while p.exitcode is None:
p.join()
if parent_conn.poll():
obj = parent_conn.recv()
if isinstance(obj, CustomException):
self.terminate_all()
def wait(self):
for t in self._threads:
t.join()
if __name__ == '__main__':
proc_manager = ProcessManager()
for i in range(0, 20):
proc_manager.launch_proc(run, (i,))
proc_manager.wait()
使用多处理进程对象,当其中一个进程发生异常时,我们如何终止所有活动进程?
from multiprocessing import Process
from time import sleep
def func(i):
print 'begin', i
sleep(i)
if i == 3:
raise Exception
print 'end', i
def terminate_all(procs):
for p in procs:
if p.is_alive():
p.terminate()
processes = []
for i in range(0,10):
p = Process(target=func, args=(i,))
processes.append(p)
p.start() # where do I catch the Exception and call terminate_all?
for p in processes:
p.join()
根据链接问题中的@abranches 回答,我能够在发生异常时终止进程。使用 Pipe 来包装异常处理。这是代码:
import multiprocessing
import threading
from time import sleep
def run(i, conn):
try:
f(i)
except CustomException as e:
conn.send(e)
def f(i):
print i, ' called'
sleep(i)
if i == 11:
raise CustomException
print i, ' done'
class CustomException(Exception):
pass
class ProcessManager(object):
def __init__(self):
self.processes = []
self._threads = []
self._lock = threading.Lock()
def terminate_all(self):
with self._lock:
for p in self.processes:
if p.is_alive():
print "Terminating %s" % p
p.terminate()
def launch_proc(self, func, args=(), kwargs= {}):
t = threading.Thread(target=self._proc_thread_runner,
args=(func, args, kwargs))
self._threads.append(t)
t.start()
def _proc_thread_runner(self, func, args, kwargs):
parent_conn, child_conn = multiprocessing.Pipe()
args = args + tuple([child_conn])
p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
self.processes.append(p)
p.start()
while p.exitcode is None:
p.join()
if parent_conn.poll():
obj = parent_conn.recv()
if isinstance(obj, CustomException):
self.terminate_all()
def wait(self):
for t in self._threads:
t.join()
if __name__ == '__main__':
proc_manager = ProcessManager()
for i in range(0, 20):
proc_manager.launch_proc(run, (i,))
proc_manager.wait()