并发获取队列中的请求

Fetching requests in queue concurrently

我编写了代码,允许我在处理前一个数据块的同时开始从 API 中获取下一个数据块。

我希望它在任何给定时刻始终同时获取 最多 5 个块,但返回的数据应始终以正确的顺序处理,即使请求队列中最后一个比其他队列中的最后一个完成。

如何更改我的代码以实现此目的?

class MyClient:
    async def fetch_entities(
        self,
        entity_ids:List[int],
        objects:Optional[List[str]],
        select_inbound:Optional[List[str]]=None,
        select_outbound:Optional[List[str]]=None,
        queue_size:int=5,
        chunk_size:int=500,
    ):
        """
        Fetch entities in chunks

        While one chunk of data is being processed the next one can
        already be fetched. In other words: Data processing does not
        block data fetching.
        """
        objects = ",".join(objects)
        if select_inbound:
            select_inbound = ",".join(select_inbound)

        if select_outbound:
            select_outbound = ",".join(select_outbound)

        queue = asyncio.Queue(maxsize=queue_size)

        # TODO: I want to be able to fill the queue with requests that are already executing

        async def queued_chunks():
            for ids in chunks(entity_ids, chunk_size):
                res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                    "entityIds": ids,
                    "objects": objects,
                    "inbound": {
                        "linkTypeIds": select_outbound,
                        "objects": objects,
                    } if select_inbound else {},
                    "outbound": {
                        "linkTypeIds": select_inbound,
                        "objects": objects,
                    } if select_outbound else {},
                })
                await queue.put(res)
            await queue.put(None)

        asyncio.create_task(queued_chunks())

        while True:
            res = await queue.get()
            if res is None:
                break
            res.raise_for_status()
            queue.task_done()
            for entity in res.json():
                yield entity

与其在入队之前等待协程,不如将协程入队并稍后等待

class MyClient:
async def fetch_entities(
    self,
    entity_ids:List[int],
    objects:Optional[List[str]],
    select_inbound:Optional[List[str]]=None,
    select_outbound:Optional[List[str]]=None,
    queue_size:int=5,
    chunk_size:int=500,
):
    """
    Fetch entities in chunks

    While one chunk of data is being processed the next one can
    already be fetched. In other words: Data processing does not
    block data fetching.
    """
    objects = ",".join(objects)
    if select_inbound:
        select_inbound = ",".join(select_inbound)

    if select_outbound:
        select_outbound = ",".join(select_outbound)

    queue = asyncio.Queue(maxsize=queue_size)

    # TODO: I want to be able to fill the queue with requests that are already executing

    async def queued_chunks():
        for ids in chunks(entity_ids, chunk_size):
            cor = self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                "entityIds": ids,
                "objects": objects,
                "inbound": {
                    "linkTypeIds": select_outbound,
                    "objects": objects,
                } if select_inbound else {},
                "outbound": {
                    "linkTypeIds": select_inbound,
                    "objects": objects,
                } if select_outbound else {},
            })
            task = asyncio.create_task(cor)
            await queue.put(cor)
        await queue.put(None)

    asyncio.create_task(queued_chunks())

    while True:
        task = await queue.get()
        if task is None:
            break
        res = await task
        res.raise_for_status()
        queue.task_done()
        for entity in res.json():
            yield entity

我会在这里使用两个队列:一个用于处理要处理的块,一个用于处理已完成的块。你可以有任意数量的工作任务来处理块,你可以在第一个队列上设置一个大小限制来限制你预取的块数。只使用一个循环来接收处理过的块,以确保它们保持有序(您的代码已经这样做了)。

诀窍是将 futures 放入两个队列中,一个用于每个要处理的块。执行处理的工作任务获取一个块和未来对,然后需要通过将 POST 响应设置为这些未来的结果来解析关联的未来。处理已处理块的循环等待每个未来,因此只有在当前块已完全处理后才会继续下一个块。为此,您需要将块 相应的未来放入第一个队列中,供工作人员处理。将 same future 放入第二个队列;这些强制块结果按顺序处理。

所以,总结一下:

  • 有两个队列:
    1. chunks 拥有 (chunk, future) 个对象。
    2. completed 持有期货,*相同的期货与另一个队列中的块配对。
  • 创建从块队列中消耗的“工作”任务。如果创建 5 个,则将并行处理 5 个块。每次 worker 完成处理时,他们将结果设置在相应的 future 上。
  • 使用“已处理的块”循环;它从 completed 队列中获取下一个未来并等待它。只有当与该未来关联的 specific 块被竞争时,它才会产生结果(由工作任务设置)。

作为粗略的草图,它看起来像这样:

chunk_queue = asyncio.Queue()
completed_queue = asyncio.Queue()
WORKER_COUNT = queue_size

async def queued_chunks():
    for ids in chunks(entity_ids, chunk_size):
        future = asyncio.Future()
        await chunk_queue.put((ids, future))
        await completed_queue.put(future)
    completed_queue.put(None)

async def worker():
    while True:
        ids, future = chunk_queue.get()
        try:
            res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                "entityIds": ids,
                "objects": objects,
                "inbound": {
                    "linkTypeIds": select_outbound,
                    "objects": objects,
                } if select_inbound else {},
                "outbound": {
                    "linkTypeIds": select_inbound,
                    "objects": objects,
                } if select_outbound else {},
            })
            res.raise_for_status()
            future.set_result(res)
        except Exception as e:
            future.set_exception(e)
            return

workers = [asyncio.create_task(worker) for _ in range(WORKER_COUNT)]
chunk_producer = asyncio.create_task(queued_chunks())

try:
    while True:
        future = await completed_queue.get()
        if future is None:
            # all chunks have been processed!
            break
        res = await future
        yield from res.json()

finally:
    for w in workers:
        w.cancel()
    asyncio.wait(workers)

如果您必须限制 排队 的块数(而不仅仅是同时处理的块数),请在 chunk_queue 队列上设置 maxsize (大于 WORKER_COUNT 的值)。例如,使用它来限制内存需求。

但是,如果您要将 maxsize 设置为 等于 WORKER_COUNT 的值,您还不如完全摆脱工作任务而是将工作循环的主体作为包裹在任务中的协程放入已完成的结果队列中。 asyncio Task class 是 Future 的子 class,当它包装的协程完成时,它会自动设置未来的结果。如果你不打算在 chunk_queue 中放入比你的工作任务更多的块,你也可以去掉中间人,完全放弃 chunk_queue。然后任务进入完成的队列而不是普通的未来:

completed_queue = asyncio.Queue(maxsize=queue_size)

async def queued_chunks():
    for ids in chunks(entity_ids, chunk_size):
        task = asyncio.create_task(fetch_task(ids))
        await completed_queue.put(task)
    completed_queue.put(None)

async def fetch_task(ids):
    res = await self.client.post(urllib.parse.quote("entities:fetchdata"),
        json={
            "entityIds": ids,
            "objects": objects,
            "inbound": {
                "linkTypeIds": select_outbound,
                "objects": objects,
            } if select_inbound else {},
            "outbound": {
                "linkTypeIds": select_inbound,
                "objects": objects,
            } if select_outbound else {},
        }
    )
    res.raise_for_status()
    return res

chunk_producer = asyncio.create_task(queued_chunks())

while True:
    task = await completed_queue.get()
    if task is None:
        # all chunks have been processed!
        break
    res = await task
    yield from task.json()

这个版本非常接近您已有的版本,唯一的区别是我们将等待客户端 POST 协程和检查响应状态代码放入单独的协程中 运行 作为任务。您还可以将 self.client.post() 协同程序放入任务中(因此 not await )并将响应状态检查留给最终队列处理循环。这就是提出的,所以我不会在这里重复。

请注意,此版本在将任务放入队列之前启动任务。队列不是对活动任务数量的唯一限制,还有一个已经启动的任务等待 space 在一端放入队列(如果队列 await completed_queue.put(task) 行阻塞已满),另一个任务已经被队列消费者取出(由 task = await completed_queue.get() 获取)。如果需要限制活动任务数,将queue maxsize减2设置上限。

此外,由于任务可以同时完成,因此可能会有 maxsize + 1 更少 活动 个任务,但您可以在队列中释放更多 space 之前不再开始。因为第一种方法为任务排队 inputs,所以没有这些问题。您 可以 通过使用 semaphore 而不是绑定的队列大小来限制任务来缓解这个问题(在开始任务之前获取一个插槽,并在从任务返回之前释放一个插槽)。

就我个人而言,我会选择我的第一个建议,因为它可以让您单独控制并发和块预取,而不会出现第二种方法存在的问题。