为什么 asyncio queue await get() 阻塞?
Why is asyncio queue await get() blocking?
为什么 await queue.get() 阻塞?
import asyncio
async def producer(queue, item):
await queue.put(item)
async def consumer(queue):
val = await queue.get()
print("val = %d" % val)
async def main():
queue = asyncio.Queue()
await consumer(queue)
await producer(queue, 1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
如果我在 consumer() 之前调用 producer(),它工作正常
也就是说,下面的工作正常。
async def main():
queue = asyncio.Queue()
await producer(queue, 1)
await consumer(queue)
为什么 await queue.get() 将控制权交还给事件循环,以便生产者协程可以 运行 填充队列这样 queue.get() 就可以 return.
这是因为你调用了 await consumer(queue)
,这意味着下一行 (procuder
) 直到 consumer
returns 才会被调用,当然它永远不会调用,因为还没有人生产
查看文档中的示例,看看他们如何使用它:https://docs.python.org/3/library/asyncio-queue.html#examples
另一个简单的例子:
import asyncio
import random
async def produce(queue, n):
for x in range(1, n + 1):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
# indicate the producer is done
await queue.put(None)
async def consume(queue):
while True:
# wait for an item from the producer
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
你应该使用 .run_until_complete()
和 .gather()
这是您更新后的代码:
import asyncio
async def producer(queue, item):
await queue.put(item)
async def consumer(queue):
val = await queue.get()
print("val = %d" % val)
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.gather(consumer(queue), producer(queue, 1))
)
loop.close()
输出:
val = 1
您也可以将 .run_forever()
与 .create_task()
一起使用
因此您的代码片段将是:
import asyncio
async def producer(queue, item):
await queue.put(item)
async def consumer(queue):
val = await queue.get()
print("val = %d" % val)
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(consumer(queue))
loop.create_task(producer(queue, 1))
try:
loop.run_forever()
except KeyboardInterrupt:
loop.close()
输出:
val = 1
您需要并行启动消费者和生产者,例如像这样定义 main
:
async def main():
queue = asyncio.Queue()
await asyncio.gather(consumer(queue), producer(queue, 1))
如果由于某种原因您不能使用 gather
,那么您可以这样做(相当于):
async def main():
queue = asyncio.Queue()
asyncio.create_task(consumer(queue))
asyncio.create_task(producer(queue, 1))
await asyncio.sleep(100) # what your program actually does
Why isn't await queue.get()
yielding control back to the event loop so that the producer coroutine can run which will populate the queue so that queue.get()
can return.
await queue.get()
是 将控制权交还给事件循环。但是 await 意味着 wait,所以当你的 main
协程说 await consumer(queue)
时,这意味着 "resume me once consumer(queue)
has completed." 因为 consumer(queue)
本身就是在等待某人生产某物,你有一个经典的死锁案例。
仅因为您的生产者是 one-shot,所以颠倒顺序才有效,因此它会立即 returns 给调用者。如果您的生产者碰巧等待外部源(例如套接字),您也会在那里遇到死锁。无论 producer
和 consumer
是如何写的,并行启动它们都可以避免死锁。
为什么 await queue.get() 阻塞?
import asyncio
async def producer(queue, item):
await queue.put(item)
async def consumer(queue):
val = await queue.get()
print("val = %d" % val)
async def main():
queue = asyncio.Queue()
await consumer(queue)
await producer(queue, 1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
如果我在 consumer() 之前调用 producer(),它工作正常 也就是说,下面的工作正常。
async def main():
queue = asyncio.Queue()
await producer(queue, 1)
await consumer(queue)
为什么 await queue.get() 将控制权交还给事件循环,以便生产者协程可以 运行 填充队列这样 queue.get() 就可以 return.
这是因为你调用了 await consumer(queue)
,这意味着下一行 (procuder
) 直到 consumer
returns 才会被调用,当然它永远不会调用,因为还没有人生产
查看文档中的示例,看看他们如何使用它:https://docs.python.org/3/library/asyncio-queue.html#examples
另一个简单的例子:
import asyncio
import random
async def produce(queue, n):
for x in range(1, n + 1):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
# indicate the producer is done
await queue.put(None)
async def consume(queue):
while True:
# wait for an item from the producer
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
你应该使用 .run_until_complete()
和 .gather()
这是您更新后的代码:
import asyncio
async def producer(queue, item):
await queue.put(item)
async def consumer(queue):
val = await queue.get()
print("val = %d" % val)
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.gather(consumer(queue), producer(queue, 1))
)
loop.close()
输出:
val = 1
您也可以将 .run_forever()
与 .create_task()
因此您的代码片段将是:
import asyncio
async def producer(queue, item):
await queue.put(item)
async def consumer(queue):
val = await queue.get()
print("val = %d" % val)
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(consumer(queue))
loop.create_task(producer(queue, 1))
try:
loop.run_forever()
except KeyboardInterrupt:
loop.close()
输出:
val = 1
您需要并行启动消费者和生产者,例如像这样定义 main
:
async def main():
queue = asyncio.Queue()
await asyncio.gather(consumer(queue), producer(queue, 1))
如果由于某种原因您不能使用 gather
,那么您可以这样做(相当于):
async def main():
queue = asyncio.Queue()
asyncio.create_task(consumer(queue))
asyncio.create_task(producer(queue, 1))
await asyncio.sleep(100) # what your program actually does
Why isn't
await queue.get()
yielding control back to the event loop so that the producer coroutine can run which will populate the queue so thatqueue.get()
can return.
await queue.get()
是 将控制权交还给事件循环。但是 await 意味着 wait,所以当你的 main
协程说 await consumer(queue)
时,这意味着 "resume me once consumer(queue)
has completed." 因为 consumer(queue)
本身就是在等待某人生产某物,你有一个经典的死锁案例。
仅因为您的生产者是 one-shot,所以颠倒顺序才有效,因此它会立即 returns 给调用者。如果您的生产者碰巧等待外部源(例如套接字),您也会在那里遇到死锁。无论 producer
和 consumer
是如何写的,并行启动它们都可以避免死锁。