来自生产者线程的 Asyncio 消费者无法正常工作

Asyncio consumer from producer thread not working

我有 2 个主线程 (consumer/producer) 通过 SimpleQueue 进行通信。我希望消费者的执行速度与队列中添加的一样快。我也想避免 asyncio.Queue 因为我想让消费者和生产者分离并灵活应对未来的变化。

我开始研究 gevent、asyncio 等,但它们让我感到很困惑。

from queue import SimpleQueue
from time import sleep
import threading

q = SimpleQueue()
q.put(1)
q.put(2)
q.put(3)


def serve_forever():
    import asyncio
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    while True:
        task = q.get()
        print(f"Dequeued task {task}")
        async def run_task():
            print(f"Working on task {task}")
        loop.create_task(run_task())  # run task


thread = threading.Thread(target=serve_forever)
thread.daemon = True
thread.start()

sleep(1)

输出:

Dequeued task 1
Dequeued task 2
Dequeued task 3

为什么 run_task 在我的案例中没有执行?

简单地调用 create_task 实际上并没有 运行 任何东西;你需要有一个 运行ning asyncio 事件循环,你可以通过调用 asyncio.runloop.run_until_complete.

之类的东西来获得它

您也不需要像现在这样创建明确的 loop; asyncio 提供了一个默认的事件循环。

最后,如果您从不调用 await,asyncio 任务将不会 运行,因为这是当前任务为其他任务分配执行时间的方式。因此,即使我们修复了之前的问题,您的任务也永远不会执行,因为执行会卡在您的 while 循环中。我们需要能够在 q.get() 调用中 await,这在使用 queue.SimpleQueue.

时是不可能的

我们可以解决上述问题 - 同时仍然使用 queue.SimpleQueue - 通过使用 run_in_executor 方法 运行 non-async q.get 调用(这 运行 在单独的线程中进行调用,并允许我们异步等待结果)。以下代码按我认为的方式工作:

import asyncio
import threading
import queue

q = queue.SimpleQueue()

q.put(1)
q.put(2)
q.put(3)


async def run_task(task):
    print(f"Working on task {task}")


async def serve_forever():
    loop = asyncio.get_event_loop()

    while True:
        task = await loop.run_in_executor(None, lambda: q.get())
        print(f"Dequeued task {task}")
        asyncio.create_task(run_task(task))  # run task


def thread_main():
    asyncio.run(serve_forever())


thread = threading.Thread(target=thread_main)
thread.daemon = True
thread.start()

# just hang around waiting for thread to exit (which it never will)
thread.join()

输出:

Dequeued task 1
Working on task 1
Dequeued task 2
Working on task 2
Dequeued task 3
Working on task 3