将项目从同步代码 运行 放入单独线程中的 asyncio.Queue 的最简单方法

simplest way to put an item in an asyncio.Queue from sync code running in a separate thread

Python 抱怨 RuntimeWarning: coroutine 'Queue.put' was never awaited

我搜索了一下,发现有像 Janus 这样的库可以解决这样的问题。但是在 3.8 上有更好的方法吗?

更新。我能够使用 create_task 将项目放入队列中,但它会在 getput 上阻塞,直到系统中发生其他异步事件才停止阻塞,即使应该现在成为队列中的一个项目,因为它不需要阻塞。任何想法为什么会发生?它需要大约 10-20 秒才能自动解除阻塞,但如果我发送另一个事件,它会立即解除对前一个事件的阻塞,但除非我发送另一个事件,否则当前事件会有延迟。

您正在从运行事件循环的线程外部调用 create_task。您应该使用 asyncio.run_coroutine_threadsafe 代替:

if result:
    # tell asyncio to enqueue the result
    fut = asyncio.run_coroutine_threadsafe(
        tasks.completed.put(result), loop)
    # wait for the result to be enqueued
    fut.result()

(您应该在主线程中检索循环并将其传递给线程。)

如果您的队列是无界的并且您不需要处理背压,您可以使用 call_soon_threadsafe:

调用 put_nowait
if result:
    # tell asyncio to enqueue the result
    loop.call_soon_threadsafe(
        tasks.completed.put_nowait, result)

I was able to use create_task to put the item in the queue but it's either blocking on get or put until some other async event occurs in the system before it stops blocking even though there should now be an item in the queue for it not to need to block.

这是因为 loop.create_task 不是线程安全的,所以它不能正确地通知事件循环发生了某些事情。