Python 多进程池。当其中一个工作进程确定不需要完成更多工作时如何退出脚本?
Python Multiprocess Pool. How to exit the script when one of the worker process determines no more work needs to be done?
mp.set_start_method('spawn')
total_count = Counter(0)
pool = mp.Pool(initializer=init, initargs=(total_count,), processes=num_proc)
pool.map(part_crack_helper, product(seed_str, repeat=4))
pool.close()
pool.join()
所以我有一个工作进程池可以做一些工作。它只需要找到一个解决方案。因此,当其中一个工作进程找到解决方案时,我想停止一切。
我想到的一种方法就是调用 sys.exit()。但是,这似乎无法正常工作,因为其他进程是 运行.
另一种方法是检查每个进程调用的 return 值(part_crack_helper 函数的 return 值)并在该进程上调用终止。但是,我不知道如何在使用地图功能时做到这一点。
我该如何实现?
您可以使用来自 Pool.apply_async
的回调。
像这样的东西应该适合你。
from multiprocessing import Pool
def part_crack_helper(args):
solution = do_job(args)
if solution:
return True
else:
return False
class Worker():
def __init__(self, workers, initializer, initargs):
self.pool = Pool(processes=workers,
initializer=initializer,
initargs=initargs)
def callback(self, result):
if result:
print("Solution found! Yay!")
self.pool.terminate()
def do_job(self):
for args in product(seed_str, repeat=4):
self.pool.apply_async(part_crack_helper,
args=args,
callback=self.callback)
self.pool.close()
self.pool.join()
print("good bye")
w = Worker(num_proc, init, [total_count])
w.do_job()
如果您可以使用其他库,您可以使用 Pebble 通过以下方式解决它。此解决方案的优点是您可以额外指定超时。这意味着如果有一个成功的工人或时间用完则程序结束:
from pebble import ProcessPool, ProcessExpired
from concurrent.futures import TimeoutError
import time
pool = ProcessPool()
def my_function(args):
print("running " + str(args))
time.sleep((args + 1) * 30)
print("process won:" + str(args))
return True
start_time = time.time()
future = pool.map(my_function, range(4), timeout=65)
iterator = future.result()
while True:
try:
result = next(iterator)
if result:
pool.stop()
pool.join(timeout=0)
break
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error.traceback) # Python's traceback of remote process
print("whole time: " + str(time.time() - start_time))
mp.set_start_method('spawn')
total_count = Counter(0)
pool = mp.Pool(initializer=init, initargs=(total_count,), processes=num_proc)
pool.map(part_crack_helper, product(seed_str, repeat=4))
pool.close()
pool.join()
所以我有一个工作进程池可以做一些工作。它只需要找到一个解决方案。因此,当其中一个工作进程找到解决方案时,我想停止一切。
我想到的一种方法就是调用 sys.exit()。但是,这似乎无法正常工作,因为其他进程是 运行.
另一种方法是检查每个进程调用的 return 值(part_crack_helper 函数的 return 值)并在该进程上调用终止。但是,我不知道如何在使用地图功能时做到这一点。
我该如何实现?
您可以使用来自 Pool.apply_async
的回调。
像这样的东西应该适合你。
from multiprocessing import Pool
def part_crack_helper(args):
solution = do_job(args)
if solution:
return True
else:
return False
class Worker():
def __init__(self, workers, initializer, initargs):
self.pool = Pool(processes=workers,
initializer=initializer,
initargs=initargs)
def callback(self, result):
if result:
print("Solution found! Yay!")
self.pool.terminate()
def do_job(self):
for args in product(seed_str, repeat=4):
self.pool.apply_async(part_crack_helper,
args=args,
callback=self.callback)
self.pool.close()
self.pool.join()
print("good bye")
w = Worker(num_proc, init, [total_count])
w.do_job()
如果您可以使用其他库,您可以使用 Pebble 通过以下方式解决它。此解决方案的优点是您可以额外指定超时。这意味着如果有一个成功的工人或时间用完则程序结束:
from pebble import ProcessPool, ProcessExpired
from concurrent.futures import TimeoutError
import time
pool = ProcessPool()
def my_function(args):
print("running " + str(args))
time.sleep((args + 1) * 30)
print("process won:" + str(args))
return True
start_time = time.time()
future = pool.map(my_function, range(4), timeout=65)
iterator = future.result()
while True:
try:
result = next(iterator)
if result:
pool.stop()
pool.join(timeout=0)
break
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error.traceback) # Python's traceback of remote process
print("whole time: " + str(time.time() - start_time))