来自生产者线程的 Asyncio 消费者无法正常工作
Asyncio consumer from producer thread not working
我有 2 个主线程 (consumer/producer) 通过 SimpleQueue
进行通信。我希望消费者的执行速度与队列中添加的一样快。我也想避免 asyncio.Queue
因为我想让消费者和生产者分离并灵活应对未来的变化。
我开始研究 gevent、asyncio 等,但它们让我感到很困惑。
from queue import SimpleQueue
from time import sleep
import threading
q = SimpleQueue()
q.put(1)
q.put(2)
q.put(3)
def serve_forever():
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
task = q.get()
print(f"Dequeued task {task}")
async def run_task():
print(f"Working on task {task}")
loop.create_task(run_task()) # run task
thread = threading.Thread(target=serve_forever)
thread.daemon = True
thread.start()
sleep(1)
输出:
Dequeued task 1
Dequeued task 2
Dequeued task 3
为什么 run_task
在我的案例中没有执行?
简单地调用 create_task
实际上并没有 运行 任何东西;你需要有一个 运行ning asyncio
事件循环,你可以通过调用 asyncio.run
或 loop.run_until_complete
.
之类的东西来获得它
您也不需要像现在这样创建明确的 loop
; asyncio 提供了一个默认的事件循环。
最后,如果您从不调用 await
,asyncio 任务将不会 运行,因为这是当前任务为其他任务分配执行时间的方式。因此,即使我们修复了之前的问题,您的任务也永远不会执行,因为执行会卡在您的 while
循环中。我们需要能够在 q.get()
调用中 await
,这在使用 queue.SimpleQueue
.
时是不可能的
我们可以解决上述问题 - 同时仍然使用 queue.SimpleQueue
- 通过使用 run_in_executor
方法 运行 non-async q.get
调用(这 运行 在单独的线程中进行调用,并允许我们异步等待结果)。以下代码按我认为的方式工作:
import asyncio
import threading
import queue
q = queue.SimpleQueue()
q.put(1)
q.put(2)
q.put(3)
async def run_task(task):
print(f"Working on task {task}")
async def serve_forever():
loop = asyncio.get_event_loop()
while True:
task = await loop.run_in_executor(None, lambda: q.get())
print(f"Dequeued task {task}")
asyncio.create_task(run_task(task)) # run task
def thread_main():
asyncio.run(serve_forever())
thread = threading.Thread(target=thread_main)
thread.daemon = True
thread.start()
# just hang around waiting for thread to exit (which it never will)
thread.join()
输出:
Dequeued task 1
Working on task 1
Dequeued task 2
Working on task 2
Dequeued task 3
Working on task 3
我有 2 个主线程 (consumer/producer) 通过 SimpleQueue
进行通信。我希望消费者的执行速度与队列中添加的一样快。我也想避免 asyncio.Queue
因为我想让消费者和生产者分离并灵活应对未来的变化。
我开始研究 gevent、asyncio 等,但它们让我感到很困惑。
from queue import SimpleQueue
from time import sleep
import threading
q = SimpleQueue()
q.put(1)
q.put(2)
q.put(3)
def serve_forever():
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
task = q.get()
print(f"Dequeued task {task}")
async def run_task():
print(f"Working on task {task}")
loop.create_task(run_task()) # run task
thread = threading.Thread(target=serve_forever)
thread.daemon = True
thread.start()
sleep(1)
输出:
Dequeued task 1
Dequeued task 2
Dequeued task 3
为什么 run_task
在我的案例中没有执行?
简单地调用 create_task
实际上并没有 运行 任何东西;你需要有一个 运行ning asyncio
事件循环,你可以通过调用 asyncio.run
或 loop.run_until_complete
.
您也不需要像现在这样创建明确的 loop
; asyncio 提供了一个默认的事件循环。
最后,如果您从不调用 await
,asyncio 任务将不会 运行,因为这是当前任务为其他任务分配执行时间的方式。因此,即使我们修复了之前的问题,您的任务也永远不会执行,因为执行会卡在您的 while
循环中。我们需要能够在 q.get()
调用中 await
,这在使用 queue.SimpleQueue
.
我们可以解决上述问题 - 同时仍然使用 queue.SimpleQueue
- 通过使用 run_in_executor
方法 运行 non-async q.get
调用(这 运行 在单独的线程中进行调用,并允许我们异步等待结果)。以下代码按我认为的方式工作:
import asyncio
import threading
import queue
q = queue.SimpleQueue()
q.put(1)
q.put(2)
q.put(3)
async def run_task(task):
print(f"Working on task {task}")
async def serve_forever():
loop = asyncio.get_event_loop()
while True:
task = await loop.run_in_executor(None, lambda: q.get())
print(f"Dequeued task {task}")
asyncio.create_task(run_task(task)) # run task
def thread_main():
asyncio.run(serve_forever())
thread = threading.Thread(target=thread_main)
thread.daemon = True
thread.start()
# just hang around waiting for thread to exit (which it never will)
thread.join()
输出:
Dequeued task 1
Working on task 1
Dequeued task 2
Working on task 2
Dequeued task 3
Working on task 3