将 tqdm 与 concurrent.futures 一起使用?
Use tqdm with concurrent.futures?
我有一个多线程函数,我想要一个使用 tqdm
的状态栏。有没有一种简单的方法可以用 ThreadPoolExecutor
显示状态栏?令我困惑的是并行化部分。
import concurrent.futures
def f(x):
return f**2
my_iter = range(1000000)
def run(f,my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
function = list(executor.map(f, my_iter))
return results
run(f, my_iter) # wrap tqdr around this function?
您可以将 tqdm
包裹在 executor
周围,如下所示以跟踪进度:
list(tqdm(executor.map(f, iter), total=len(iter))
这是你的例子:
import time
import concurrent.futures
from tqdm import tqdm
def f(x):
time.sleep(0.001) # to visualize the progress
return x**2
def run(f, my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(tqdm(executor.map(f, my_iter), total=len(my_iter)))
return results
my_iter = range(100000)
run(f, my_iter)
结果是这样的:
16%|██▏ | 15707/100000 [00:00<00:02, 31312.54it/s]
最短路线,我认为:
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(myfunc, range(len(my_array))), total=len(my_array)))
已接受答案的问题是 ThreadPoolExecutor.map
函数有义务生成结果,而不是按照它们变得可用的顺序。因此,如果 myfunc
的第一次调用恰好是最后一次完成,则进度条将同时从 0% 变为 100%,并且仅当所有调用都已完成时。更好的方法是将 ThreadPoolExecutor.submit
与 as_completed
:
一起使用
import time
import concurrent.futures
from tqdm import tqdm
def f(x):
time.sleep(0.001) # to visualize the progress
return x**2
def run(f, my_iter):
l = len(my_iter)
with tqdm(total=l) as pbar:
# let's give it some more threads:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(f, arg): arg for arg in my_iter}
results = {}
for future in concurrent.futures.as_completed(futures):
arg = futures[future]
results[arg] = future.result()
pbar.update(1)
print(321, results[321])
my_iter = range(100000)
run(f, my_iter)
打印:
321 103041
这只是一般的想法。根据 my_iter
的类型,可能无法在不先将其转换为列表的情况下直接对其应用 len
函数。要点是使用 submit
和 as_completed
.
尝试了该示例,但进度条仍然失败,我发现 this post,在简短的使用方式中似乎很有用:
def tqdm_parallel_map(fn, *iterables):
""" use tqdm to show progress"""
executor = concurrent.futures.ProcessPoolExecutor()
futures_list = []
for iterable in iterables:
futures_list += [executor.submit(fn, i) for i in iterable]
for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list)):
yield f.result()
def multi_cpu_dispatcher_process_tqdm(data_list, single_job_fn):
""" multi cpu dispatcher """
output = []
for result in tqdm_parallel_map(single_job_fn, data_list):
output += result
return output
我有一个多线程函数,我想要一个使用 tqdm
的状态栏。有没有一种简单的方法可以用 ThreadPoolExecutor
显示状态栏?令我困惑的是并行化部分。
import concurrent.futures
def f(x):
return f**2
my_iter = range(1000000)
def run(f,my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
function = list(executor.map(f, my_iter))
return results
run(f, my_iter) # wrap tqdr around this function?
您可以将 tqdm
包裹在 executor
周围,如下所示以跟踪进度:
list(tqdm(executor.map(f, iter), total=len(iter))
这是你的例子:
import time
import concurrent.futures
from tqdm import tqdm
def f(x):
time.sleep(0.001) # to visualize the progress
return x**2
def run(f, my_iter):
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(tqdm(executor.map(f, my_iter), total=len(my_iter)))
return results
my_iter = range(100000)
run(f, my_iter)
结果是这样的:
16%|██▏ | 15707/100000 [00:00<00:02, 31312.54it/s]
最短路线,我认为:
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(myfunc, range(len(my_array))), total=len(my_array)))
已接受答案的问题是 ThreadPoolExecutor.map
函数有义务生成结果,而不是按照它们变得可用的顺序。因此,如果 myfunc
的第一次调用恰好是最后一次完成,则进度条将同时从 0% 变为 100%,并且仅当所有调用都已完成时。更好的方法是将 ThreadPoolExecutor.submit
与 as_completed
:
import time
import concurrent.futures
from tqdm import tqdm
def f(x):
time.sleep(0.001) # to visualize the progress
return x**2
def run(f, my_iter):
l = len(my_iter)
with tqdm(total=l) as pbar:
# let's give it some more threads:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(f, arg): arg for arg in my_iter}
results = {}
for future in concurrent.futures.as_completed(futures):
arg = futures[future]
results[arg] = future.result()
pbar.update(1)
print(321, results[321])
my_iter = range(100000)
run(f, my_iter)
打印:
321 103041
这只是一般的想法。根据 my_iter
的类型,可能无法在不先将其转换为列表的情况下直接对其应用 len
函数。要点是使用 submit
和 as_completed
.
尝试了该示例,但进度条仍然失败,我发现 this post,在简短的使用方式中似乎很有用:
def tqdm_parallel_map(fn, *iterables):
""" use tqdm to show progress"""
executor = concurrent.futures.ProcessPoolExecutor()
futures_list = []
for iterable in iterables:
futures_list += [executor.submit(fn, i) for i in iterable]
for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list)):
yield f.result()
def multi_cpu_dispatcher_process_tqdm(data_list, single_job_fn):
""" multi cpu dispatcher """
output = []
for result in tqdm_parallel_map(single_job_fn, data_list):
output += result
return output