python ThreadPoolExecutor 在我得到结果时关闭所有线程

python ThreadPoolExecutor close all threads when I get a result

我是运行一个webscraper class,方法名称是self.get_with_random_proxy_using_chain

我正在尝试向同一个 url 发送多线程调用,并且希望一旦有来自任何线程的结果,方法 returns 就会响应并关闭其他仍处于活动状态的线程。

到目前为止我的代码看起来像这样(可能很幼稚):

from concurrent.futures import ThreadPoolExecutor, as_completed
# class initiation etc

max_workers = cpu_count() * 5
urls = [url_to_open] * 50

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_url=[]
    for url in urls: # i had to do a loop to include sleep not to overload the proxy server
        future_to_url.append(executor.submit(self.get_with_random_proxy_using_chain,
                                     url,
                                     timeout,
                                     update_proxy_score,
                                     unwanted_keywords,
                                     unwanted_status_codes,
                                     random_universe_size,
                                     file_path_to_save_streamed_content))
        sleep(0.5)

    for future in as_completed(future_to_url):
            if future.result() is not None:
                return future.result()

但它运行所有线程。

有没有办法在第一个 future 完成后关闭所有线程。 我正在使用 windows 和 python 3.7x

到目前为止我找到了这个link,但我没能让它工作(pogram 仍然运行了很长时间)。

据我所知,运行 期货不能取消。 written 对此有很多讨论。甚至还有一些解决方法。

但我建议您仔细查看 asyncio 模块。它非常适合此类任务。

下面是一个简单的例子,当有多个并发请求时,收到第一个结果后,其余的被取消。

import asyncio
from typing import Set

from aiohttp import ClientSession


async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()


async def wait_for_first_response(tasks):
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for p in pending:
        p.cancel()
    return done.pop().result()


async def request_one_of(*urls):
    tasks = set()
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.create_task(fetch(url, session))
            tasks.add(task)

        return await wait_for_first_response(tasks)


async def main():
    response = await request_one_of("https://wikipedia.org", "https://apple.com")
    print(response)

asyncio.run(main())