URL 并发获取循环 python

Concurrent URL fetching loops with python

我需要 运行 大约 500 个并发 循环 。每个循环将按顺序获取一个分页的 REST 端点,直到它到达 500 个端点中每个端点的最后一页。其中一些循环只有 5 到 10 页,因此很快就会完成,但其他循环有数百页。

问题是 我需要将此 URL 提取放在一个顺序的阻塞循环中 因为 [=42] 每个页面都必须按顺序提取=] 限制(API 将 抛出错误 如果我先获取第 7 页,然后再获取第 5 页)。因此,这里的并行单元是每个循环,而不是每个 URL 在循环内获取。

任何地方都不会进行繁重的计算。只需获取一个页面,然后将原始内容放入 kafka 主题中。除了依赖多核的多进程之外,我愿意接受任何建议。 AsyncIO、Gevent、多线程...

编辑 1:

实际问题是,如果我使用 aiohttp 在每个循环中异步获取每个页面,我无法保证第 2 页会在第 2 页之后获取1、请求会按照正确的顺序发起,但是绝对不能保证请求一定会按照正确的顺序到达端点并被处理。

编辑 2:

正如用户 4815162342 所指出的,aiohttp 应该可以工作

谢谢!

在 asyncio 中,您可以并行启动与端点一样多的循环,并等待所有循环完成。每个循环将使用 aiohttp 顺序获取端点页面。例如:

async def download_loop(session, endpoint):
    for i in itertools.count(1):
        try:
            async with session.get(endpoint, params={'page': str(i)}) as resp:
                content = await resp.read()
        except aiohttp.ClientResponseError:
            break   # no more pages
        # do something with the response content

async def download(endpoints):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession() as session:
        # Start all loops in parallel and wait for them to finish.
        # This will start as many loops as there are endpoints.
        await asyncio.wait([download_loop(session, endpoint)
                            for endpoint in endpoints])

# for testing:
loop = asyncio.get_event_loop()
loop.run_until_complete(download(['http://endpoint1', 'http://endpoint2', ...]))

生产代码也可能捕获 aiohttp.ClientConnectionError 并重试 URL。