asyncio:任务只能在前一个任务达到预定义阶段时才开始吗?

asyncio: can a task only start when previous task reach a pre-defined stage?

我从 asyncio 开始,我希望将其应用于以下问题:

with open('my_file', 'w+b') as f:
    for chunk in chunks:
        compress_chunk(ch)
        f.write(ch)

从这个上下文来看,为了运行这个过程更快,一旦当前迭代的write步开始,下一次迭代的compress步是否也被触发?

我可以用 asyncio 保持类似的 for 循环结构吗?如果是的话,你能分享一些关于这方面的建议吗?

我猜另一种并行 运行 的方法是使用 ProcessPoolExecutor 并将 compress 阶段与 write 阶段完全分开。这意味着首先压缩不同执行程序中的所有块。

只有当所有块都被压缩后,才开始写入步骤。 但是我想用 asyncio 1st 研究第一种方法,如果它有意义的话。

在此先感谢您的帮助。 最佳

您可以使用生产者-消费者模型来做到这一点。只要有一个生产者和一个消费者,你就会有正确的顺序。对于您的用例,这就是您将从中受益的全部。此外,您应该使用 aioFiles 库。标准文件 IO 将主要阻塞您的主 compression/producer 线程,您不会看到太多加速。尝试这样的事情:

async def produce(queue, chunks):
    for chunk in chunks:
        compress_chunk(ch)
        await queue.put(i)


async def consume(queue):
    with async with aiofiles.open('my_file', 'w') as f:
        while True:
            compressed_chunk = await Q.get()
            await f.write(b'Hello, World!')
            queue.task_done()


async def main():
    queue = asyncio.Queue()

    producer = asyncio.create_task(producer(queue, chunks))
    consumer = asyncio.create_task(consumer(queue))

    # wait for the producer to finish
    await producer

    # wait for the consumer to finish processing and cancel it
    await queue.join()
    consumer.cancel()
 
asyncio.run(main())

https://github.com/Tinche/aiofiles