如何控制 python 的 ThreadPoolExecutor 的吞吐速度?
How to control throughput speed of python's ThreadPoolExecutor?
我正在使用 python 的 concurrent.futures
ThreadPoolExecutor 启动异步任务。
按照 this 方法,我使用 tqdm
进度条监控异步调用的进度。
我的代码是这样的:
with concurrent.futures.ThreadPoolExecutor(max_workers = n_jobs) as executor:
future_to_url = {executor.submit(target_function, URL): URL for URL in URL_list}
kwargs = {'total': len(future_to_url), # For tqdm
'unit': 'URL', # For tqdm
'unit_scale': True, # For tqdm
'leave': False, # For tqdm
'miniters': 50, # For tqdm
'desc': 'Scraping Progress'}
for future in tqdm(concurrent.futures.as_completed(future_to_url), **kwargs):
URL = future_to_url[future]
try:
data = future.result() # Concurrent calls
except Exception as exc:
error_handling() # Handle errors
else:
result_handling() # Handle non-errors
控制台输出如下所示:
Scraping Progress: 9%|▉ | 3.35k/36.2k [08:18<1:21:22, 6.72URL/s] # I want < 6/s
Scraping Progress: 9%|▉ | 3.40k/36.2k [08:26<1:21:16, 6.72URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.45k/36.2k [08:30<1:20:40, 6.76URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.50k/36.2k [08:40<1:20:51, 6.73URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.55k/36.2k [08:46<1:20:36, 6.74URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.60k/36.2k [08:52<1:20:17, 6.76URL/s] # I want < 6/s
我知道我可以设置一个 URL 队列并控制其大小,如 here 所述。
但是,我不知道如何控制吞吐速度本身。可以说我想要的不超过 6 URLs/sec。除了在上面的示例中将 time.sleep(n) 投入 target_function()
之外,是否可以通过其他方式将其存档?
如何有效控制ThreadPoolExecutor
在python的concurrent.futures
的吞吐速度?
快回答,没有这种方法。声明池后,如果不先关闭池并重新创建它,就无法更改工作人员的数量。也没有办法让 pool feed tasks 比 workers 的最大速度慢。
您有几个(不是那么理想的)选择。
一种是给worker添加基于全局变量的sleep。然后您可以使用任务完成回调来测量实际速度并相应地调整变量。但如果睡眠是不可能的,这就不管用了。
更好但更麻烦的方法是自己编写任务管理器。在此版本中,您不使用池,而是编写一个 class 来管理工作进程。您生成 "enough" 个工作人员,工作人员监听任务队列。您将以您想要的速度从您的经理那里输入这个队列。您会将队列设置为非常小的最大大小,如果您的管理器检测到队列已满,它会生成另一个工作人员。
但是没有 built-in 功能来做你想做的事,这意味着需要做一些工作,或者你需要重新设计你的程序,这样你就不会一次性将所有任务都提供给池但在那里做一些节流。
我正在使用 python 的 concurrent.futures
ThreadPoolExecutor 启动异步任务。
按照 this 方法,我使用 tqdm
进度条监控异步调用的进度。
我的代码是这样的:
with concurrent.futures.ThreadPoolExecutor(max_workers = n_jobs) as executor:
future_to_url = {executor.submit(target_function, URL): URL for URL in URL_list}
kwargs = {'total': len(future_to_url), # For tqdm
'unit': 'URL', # For tqdm
'unit_scale': True, # For tqdm
'leave': False, # For tqdm
'miniters': 50, # For tqdm
'desc': 'Scraping Progress'}
for future in tqdm(concurrent.futures.as_completed(future_to_url), **kwargs):
URL = future_to_url[future]
try:
data = future.result() # Concurrent calls
except Exception as exc:
error_handling() # Handle errors
else:
result_handling() # Handle non-errors
控制台输出如下所示:
Scraping Progress: 9%|▉ | 3.35k/36.2k [08:18<1:21:22, 6.72URL/s] # I want < 6/s
Scraping Progress: 9%|▉ | 3.40k/36.2k [08:26<1:21:16, 6.72URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.45k/36.2k [08:30<1:20:40, 6.76URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.50k/36.2k [08:40<1:20:51, 6.73URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.55k/36.2k [08:46<1:20:36, 6.74URL/s] # I want < 6/s
Scraping Progress: 10%|▉ | 3.60k/36.2k [08:52<1:20:17, 6.76URL/s] # I want < 6/s
我知道我可以设置一个 URL 队列并控制其大小,如 here 所述。
但是,我不知道如何控制吞吐速度本身。可以说我想要的不超过 6 URLs/sec。除了在上面的示例中将 time.sleep(n) 投入 target_function()
之外,是否可以通过其他方式将其存档?
如何有效控制ThreadPoolExecutor
在python的concurrent.futures
的吞吐速度?
快回答,没有这种方法。声明池后,如果不先关闭池并重新创建它,就无法更改工作人员的数量。也没有办法让 pool feed tasks 比 workers 的最大速度慢。
您有几个(不是那么理想的)选择。
一种是给worker添加基于全局变量的sleep。然后您可以使用任务完成回调来测量实际速度并相应地调整变量。但如果睡眠是不可能的,这就不管用了。
更好但更麻烦的方法是自己编写任务管理器。在此版本中,您不使用池,而是编写一个 class 来管理工作进程。您生成 "enough" 个工作人员,工作人员监听任务队列。您将以您想要的速度从您的经理那里输入这个队列。您会将队列设置为非常小的最大大小,如果您的管理器检测到队列已满,它会生成另一个工作人员。
但是没有 built-in 功能来做你想做的事,这意味着需要做一些工作,或者你需要重新设计你的程序,这样你就不会一次性将所有任务都提供给池但在那里做一些节流。