map_async 和进度条的多处理
multiprocessing with map_async and progress bar
是否可以使用 map_async
从 multiprocessing
获得进度条:
玩具示例:
from multiprocessing import Pool
import tqdm
def f(x):
print(x)
return x*x
n_job = 4
with Pool(processes=n_job) as pool:
results = pool.map_async(f, range(10)).get()
print(results)
像这样:
data = []
with Pool(processes=10) as pool:
for d in tqdm.tqdm(
pool.imap(f, range(10)),
total=10):
data.append(d)
我想就是这样了:
from multiprocessing import Pool
import tqdm
def f(x):
return x*x
n_job = 4
data = []
with Pool(processes=10) as pool:
for d in tqdm.tqdm(
pool.map_async(f, range(10)).get(),
total=10):
data.append(d)
print(data)
我能想到有几种方法可以实现您想要的:
- 将 apply_async 与 回调 参数结合使用,以便在每个结果可用时更新进度条。
- 使用
imap
并在迭代结果时更新进度条。
imap
有一个轻微的问题,因为结果必须按task-submission顺序返回,这当然是你想要的。但是该顺序不一定反映提交任务的完成顺序,因此进度条不一定像其他情况下那样频繁更新。但我将首先展示该解决方案,因为它是最简单且可能足够的:
from multiprocessing import Pool
import tqdm
def f(x):
import time
time.sleep(1) # for demo purposes
return x*x
# Required by Windows:
if __name__ == '__main__':
pool_size = 4
results = []
with Pool(processes=pool_size) as pool:
with tqdm.tqdm(total=10) as pbar:
for result in pool.imap(f, range(10)):
results.append(result)
pbar.update()
print(results)
使用apply_async
的解决方案:
from multiprocessing import Pool
import tqdm
def f(x):
import time
time.sleep(1) # for demo purposes
return x*x
# Required by Windows:
if __name__ == '__main__':
def my_callback(_):
# We don't care about the actual result.
# Just update the progress bar:
pbar.update()
pool_size = 4
with Pool(processes=pool_size) as pool:
with tqdm.tqdm(total=10) as pbar:
async_results = [pool.apply_async(f, args=(x,), callback=my_callback) for x in range(10)]
results = [async_result.get() for async_result in async_results]
print(results)
是否可以使用 map_async
从 multiprocessing
获得进度条:
玩具示例:
from multiprocessing import Pool
import tqdm
def f(x):
print(x)
return x*x
n_job = 4
with Pool(processes=n_job) as pool:
results = pool.map_async(f, range(10)).get()
print(results)
像这样:
data = []
with Pool(processes=10) as pool:
for d in tqdm.tqdm(
pool.imap(f, range(10)),
total=10):
data.append(d)
我想就是这样了:
from multiprocessing import Pool
import tqdm
def f(x):
return x*x
n_job = 4
data = []
with Pool(processes=10) as pool:
for d in tqdm.tqdm(
pool.map_async(f, range(10)).get(),
total=10):
data.append(d)
print(data)
我能想到有几种方法可以实现您想要的:
- 将 apply_async 与 回调 参数结合使用,以便在每个结果可用时更新进度条。
- 使用
imap
并在迭代结果时更新进度条。
imap
有一个轻微的问题,因为结果必须按task-submission顺序返回,这当然是你想要的。但是该顺序不一定反映提交任务的完成顺序,因此进度条不一定像其他情况下那样频繁更新。但我将首先展示该解决方案,因为它是最简单且可能足够的:
from multiprocessing import Pool
import tqdm
def f(x):
import time
time.sleep(1) # for demo purposes
return x*x
# Required by Windows:
if __name__ == '__main__':
pool_size = 4
results = []
with Pool(processes=pool_size) as pool:
with tqdm.tqdm(total=10) as pbar:
for result in pool.imap(f, range(10)):
results.append(result)
pbar.update()
print(results)
使用apply_async
的解决方案:
from multiprocessing import Pool
import tqdm
def f(x):
import time
time.sleep(1) # for demo purposes
return x*x
# Required by Windows:
if __name__ == '__main__':
def my_callback(_):
# We don't care about the actual result.
# Just update the progress bar:
pbar.update()
pool_size = 4
with Pool(processes=pool_size) as pool:
with tqdm.tqdm(total=10) as pbar:
async_results = [pool.apply_async(f, args=(x,), callback=my_callback) for x in range(10)]
results = [async_result.get() for async_result in async_results]
print(results)