asyncio:任务只能在前一个任务达到预定义阶段时才开始吗?
asyncio: can a task only start when previous task reach a pre-defined stage?
我从 asyncio 开始,我希望将其应用于以下问题:
- 数据被分割成块。
- 第一个压缩块。
- 然后将压缩后的块写入文件。
- 一个文件用于所有块,所以我需要一个一个地处理它们。
with open('my_file', 'w+b') as f:
for chunk in chunks:
compress_chunk(ch)
f.write(ch)
从这个上下文来看,为了运行这个过程更快,一旦当前迭代的write
步开始,下一次迭代的compress
步是否也被触发?
我可以用 asyncio
保持类似的 for
循环结构吗?如果是的话,你能分享一些关于这方面的建议吗?
我猜另一种并行 运行 的方法是使用 ProcessPoolExecutor
并将 compress
阶段与 write
阶段完全分开。这意味着首先压缩不同执行程序中的所有块。
只有当所有块都被压缩后,才开始写入步骤。
但是我想用 asyncio
1st 研究第一种方法,如果它有意义的话。
在此先感谢您的帮助。
最佳
您可以使用生产者-消费者模型来做到这一点。只要有一个生产者和一个消费者,你就会有正确的顺序。对于您的用例,这就是您将从中受益的全部。此外,您应该使用 aioFiles
库。标准文件 IO 将主要阻塞您的主 compression/producer 线程,您不会看到太多加速。尝试这样的事情:
async def produce(queue, chunks):
for chunk in chunks:
compress_chunk(ch)
await queue.put(i)
async def consume(queue):
with async with aiofiles.open('my_file', 'w') as f:
while True:
compressed_chunk = await Q.get()
await f.write(b'Hello, World!')
queue.task_done()
async def main():
queue = asyncio.Queue()
producer = asyncio.create_task(producer(queue, chunks))
consumer = asyncio.create_task(consumer(queue))
# wait for the producer to finish
await producer
# wait for the consumer to finish processing and cancel it
await queue.join()
consumer.cancel()
asyncio.run(main())
https://github.com/Tinche/aiofiles
我从 asyncio 开始,我希望将其应用于以下问题:
- 数据被分割成块。
- 第一个压缩块。
- 然后将压缩后的块写入文件。
- 一个文件用于所有块,所以我需要一个一个地处理它们。
with open('my_file', 'w+b') as f:
for chunk in chunks:
compress_chunk(ch)
f.write(ch)
从这个上下文来看,为了运行这个过程更快,一旦当前迭代的write
步开始,下一次迭代的compress
步是否也被触发?
我可以用 asyncio
保持类似的 for
循环结构吗?如果是的话,你能分享一些关于这方面的建议吗?
我猜另一种并行 运行 的方法是使用 ProcessPoolExecutor
并将 compress
阶段与 write
阶段完全分开。这意味着首先压缩不同执行程序中的所有块。
只有当所有块都被压缩后,才开始写入步骤。
但是我想用 asyncio
1st 研究第一种方法,如果它有意义的话。
在此先感谢您的帮助。 最佳
您可以使用生产者-消费者模型来做到这一点。只要有一个生产者和一个消费者,你就会有正确的顺序。对于您的用例,这就是您将从中受益的全部。此外,您应该使用 aioFiles
库。标准文件 IO 将主要阻塞您的主 compression/producer 线程,您不会看到太多加速。尝试这样的事情:
async def produce(queue, chunks):
for chunk in chunks:
compress_chunk(ch)
await queue.put(i)
async def consume(queue):
with async with aiofiles.open('my_file', 'w') as f:
while True:
compressed_chunk = await Q.get()
await f.write(b'Hello, World!')
queue.task_done()
async def main():
queue = asyncio.Queue()
producer = asyncio.create_task(producer(queue, chunks))
consumer = asyncio.create_task(consumer(queue))
# wait for the producer to finish
await producer
# wait for the consumer to finish processing and cancel it
await queue.join()
consumer.cancel()
asyncio.run(main())
https://github.com/Tinche/aiofiles