每当子进程在 Python 3 的多处理池中完成时打印更新行
Printing an update line whenever a subprocess finishes in Python 3's multiprocessing Pool
我正在使用 Python 的 multiprocessing
库来处理带有内置 map()
方法的输入列表。这是相关的代码段:
subp_pool = Pool(self.subprocesses)
cases = subp_pool.map(self.get_case, input_list)
return cases
要并行运行的函数是self.get_case()
,输入列表是input_list
。
我希望按以下格式将进度提示打印到标准输出:
Working (25/100 cases processed)
如何更新包含池的 class 中的局部变量,以便每当子进程完成时,变量都会递增 1(然后打印到标准输出)?
无法使用 multiprocessing.map
执行此操作,因为它在完成所有任务之前不会向主进程发出任何警报。但是,您可以通过将 apply_async
与 callback
关键字参数结合使用来获得类似的行为:
from multiprocessing.dummy import Pool
from functools import partial
import time
class Test(object):
def __init__(self):
self.count = 0
self.threads = 4
def get_case(self, x):
time.sleep(x)
def callback(self, total, x):
self.count += 1
print("Working ({}/{}) cases processed.".format(self.count, total))
def do_async(self):
thread_pool = Pool(self.threads)
input_list = range(5)
callback = partial(self.callback, len(input_list))
tasks = [thread_pool.apply_async(self.get_case, (x,),
callback=callback) for x in input_list]
return [task.get() for task in tasks]
if __name__ == "__main__":
t = Test()
t.do_async()
从 get_case() 方法调用 print_data() 就完成了。
from threading import Lock
Class A(object):
def __init__(self):
self.mutex = Lock()
self.count = 0
def print_data(self):
self.mutex.acquire()
try:
self.count += 1
print('Working (' + str(self.count) + 'cases processed)')
finally:
self.mutex.release()
我正在使用 Python 的 multiprocessing
库来处理带有内置 map()
方法的输入列表。这是相关的代码段:
subp_pool = Pool(self.subprocesses)
cases = subp_pool.map(self.get_case, input_list)
return cases
要并行运行的函数是self.get_case()
,输入列表是input_list
。
我希望按以下格式将进度提示打印到标准输出:
Working (25/100 cases processed)
如何更新包含池的 class 中的局部变量,以便每当子进程完成时,变量都会递增 1(然后打印到标准输出)?
无法使用 multiprocessing.map
执行此操作,因为它在完成所有任务之前不会向主进程发出任何警报。但是,您可以通过将 apply_async
与 callback
关键字参数结合使用来获得类似的行为:
from multiprocessing.dummy import Pool
from functools import partial
import time
class Test(object):
def __init__(self):
self.count = 0
self.threads = 4
def get_case(self, x):
time.sleep(x)
def callback(self, total, x):
self.count += 1
print("Working ({}/{}) cases processed.".format(self.count, total))
def do_async(self):
thread_pool = Pool(self.threads)
input_list = range(5)
callback = partial(self.callback, len(input_list))
tasks = [thread_pool.apply_async(self.get_case, (x,),
callback=callback) for x in input_list]
return [task.get() for task in tasks]
if __name__ == "__main__":
t = Test()
t.do_async()
从 get_case() 方法调用 print_data() 就完成了。
from threading import Lock
Class A(object):
def __init__(self):
self.mutex = Lock()
self.count = 0
def print_data(self):
self.mutex.acquire()
try:
self.count += 1
print('Working (' + str(self.count) + 'cases processed)')
finally:
self.mutex.release()