有没有办法在多线程中使用 asyncio.Queue ?

Is there a way to use asyncio.Queue in multiple threads?

假设我有以下代码:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

此代码的问题在于 async 协程内的循环从未完成第一次迭代,而 queue 大小正在增加。

为什么会这样,我该如何解决?

我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备进行通信,而且我还没有找到使用 asyncio 来做到这一点的方法。

asyncio.Queueis not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus,这是一个提供线程感知asyncio队列的第三方库:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

还有 aioprocessing(完全公开:我写的),它也提供进程安全(并且作为副作用,线程安全)队列,但如果你这样做就太过分了不要尝试使用 multiprocessing.

编辑

正如其他答案中指出的那样,对于简单的用例,您也可以使用 loop.call_soon_threadsafe 添加到队列中。

BaseEventLoop.call_soon_threadsafe就在眼前。有关详细信息,请参阅 asyncio doc

只需像这样更改您的 threaded()

def threaded():
    import time
    while True:
        time.sleep(1)
        loop.call_soon_threadsafe(queue.put_nowait, time.time())
        loop.call_soon_threadsafe(lambda: print(queue.qsize()))

这是一个示例输出:

0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943

如果您不想使用其他库,您可以从线程中安排协程。将 queue.put_nowait 替换为以下内容即可。

asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)

变量loop表示主线程中的事件循环。

编辑:

你的 async 协程没有做任何事情的原因是 事件循环从不给它机会这样做。队列对象是 不是线程安全的,如果你深入研究 cpython 代码,你会发现 这意味着 put_nowait 通过唤醒队列的消费者 在事件循环的 call_soon 方法中使用 future。如果 我们可以让它使用 call_soon_threadsafe 它应该可以工作。专业 然而,call_sooncall_soon_threadsafe 之间的区别是 call_soon_threadsafe 通过调用 loop._write_to_self() 唤醒事件循环。所以让我们自己称呼它:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

然后,一切都按预期进行。

至于线程安全方面 访问共享对象,asyncio.queue 在幕后使用 collections.deque 具有线程安全 appendpopleft。 也许检查队列是否为空并且 popleft 不是原子的,但是如果 您仅在一个线程(事件循环之一)中使用队列 应该没问题。

其他提出的解决方案,loop.call_soon_threadsafe 来自 Huazuo 高的回答和我的asyncio.run_coroutine_threadsafe只是在做 这,唤醒了事件循环。

只使用 threading.Lock 和 asyncio.Queue 怎么样?

class ThreadSafeAsyncFuture(asyncio.Future):
    """ asyncio.Future is not thread-safe
    
    """
    def set_result(self, result):
        func = super().set_result
        call = lambda: func(result)
        self._loop.call_soon_threadsafe(call)  # Warning: self._loop is undocumented


class ThreadSafeAsyncQueue(queue.Queue):
    """ asyncio.Queue is not thread-safe, threading.Queue is not awaitable
    works only with one putter to unlimited-size queue and with several getters
    TODO: add maxsize limits
    TODO: make put corouitine
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.lock = threading.Lock()
        self.loop = asyncio.get_event_loop()
        self.waiters = []

    def put(self, item):
        with self.lock:
            if self.waiters:
                self.waiters.pop(0).set_result(item)
            else:
                super().put(item)

    async def get(self):
        with self.lock:
            if not self.empty():
                return super().get()
            else:
                fut = ThreadSafeAsyncFuture()
                self.waiters.append(fut)
        result = await fut
        return result

另见 -