有没有办法在多线程中使用 asyncio.Queue ?
Is there a way to use asyncio.Queue in multiple threads?
假设我有以下代码:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
此代码的问题在于 async
协程内的循环从未完成第一次迭代,而 queue
大小正在增加。
为什么会这样,我该如何解决?
我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备进行通信,而且我还没有找到使用 asyncio
来做到这一点的方法。
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus
,这是一个提供线程感知asyncio
队列的第三方库:
import asyncio
import threading
import janus
def threaded(squeue):
import time
while True:
time.sleep(2)
squeue.put_nowait(time.time())
print(squeue.qsize())
@asyncio.coroutine
def async(aqueue):
while True:
time = yield from aqueue.get()
print(time)
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()
还有 aioprocessing
(完全公开:我写的),它也提供进程安全(并且作为副作用,线程安全)队列,但如果你这样做就太过分了不要尝试使用 multiprocessing
.
编辑
正如其他答案中指出的那样,对于简单的用例,您也可以使用 loop.call_soon_threadsafe
添加到队列中。
BaseEventLoop.call_soon_threadsafe
就在眼前。有关详细信息,请参阅 asyncio
doc。
只需像这样更改您的 threaded()
:
def threaded():
import time
while True:
time.sleep(1)
loop.call_soon_threadsafe(queue.put_nowait, time.time())
loop.call_soon_threadsafe(lambda: print(queue.qsize()))
这是一个示例输出:
0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
如果您不想使用其他库,您可以从线程中安排协程。将 queue.put_nowait
替换为以下内容即可。
asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
变量loop
表示主线程中的事件循环。
编辑:
你的 async
协程没有做任何事情的原因是
事件循环从不给它机会这样做。队列对象是
不是线程安全的,如果你深入研究 cpython 代码,你会发现
这意味着 put_nowait
通过唤醒队列的消费者
在事件循环的 call_soon
方法中使用 future。如果
我们可以让它使用 call_soon_threadsafe
它应该可以工作。专业
然而,call_soon
和 call_soon_threadsafe
之间的区别是
call_soon_threadsafe
通过调用 loop._write_to_self()
唤醒事件循环。所以让我们自己称呼它:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
然后,一切都按预期进行。
至于线程安全方面
访问共享对象,asyncio.queue
在幕后使用
collections.deque
具有线程安全 append
和 popleft
。
也许检查队列是否为空并且 popleft 不是原子的,但是如果
您仅在一个线程(事件循环之一)中使用队列
应该没问题。
其他提出的解决方案,loop.call_soon_threadsafe
来自 Huazuo
高的回答和我的asyncio.run_coroutine_threadsafe
只是在做
这,唤醒了事件循环。
只使用 threading.Lock 和 asyncio.Queue 怎么样?
class ThreadSafeAsyncFuture(asyncio.Future):
""" asyncio.Future is not thread-safe
"""
def set_result(self, result):
func = super().set_result
call = lambda: func(result)
self._loop.call_soon_threadsafe(call) # Warning: self._loop is undocumented
class ThreadSafeAsyncQueue(queue.Queue):
""" asyncio.Queue is not thread-safe, threading.Queue is not awaitable
works only with one putter to unlimited-size queue and with several getters
TODO: add maxsize limits
TODO: make put corouitine
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = threading.Lock()
self.loop = asyncio.get_event_loop()
self.waiters = []
def put(self, item):
with self.lock:
if self.waiters:
self.waiters.pop(0).set_result(item)
else:
super().put(item)
async def get(self):
with self.lock:
if not self.empty():
return super().get()
else:
fut = ThreadSafeAsyncFuture()
self.waiters.append(fut)
result = await fut
return result
另见 -
假设我有以下代码:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
此代码的问题在于 async
协程内的循环从未完成第一次迭代,而 queue
大小正在增加。
为什么会这样,我该如何解决?
我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备进行通信,而且我还没有找到使用 asyncio
来做到这一点的方法。
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus
,这是一个提供线程感知asyncio
队列的第三方库:
import asyncio
import threading
import janus
def threaded(squeue):
import time
while True:
time.sleep(2)
squeue.put_nowait(time.time())
print(squeue.qsize())
@asyncio.coroutine
def async(aqueue):
while True:
time = yield from aqueue.get()
print(time)
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()
还有 aioprocessing
(完全公开:我写的),它也提供进程安全(并且作为副作用,线程安全)队列,但如果你这样做就太过分了不要尝试使用 multiprocessing
.
编辑
正如其他答案中指出的那样,对于简单的用例,您也可以使用 loop.call_soon_threadsafe
添加到队列中。
BaseEventLoop.call_soon_threadsafe
就在眼前。有关详细信息,请参阅 asyncio
doc。
只需像这样更改您的 threaded()
:
def threaded():
import time
while True:
time.sleep(1)
loop.call_soon_threadsafe(queue.put_nowait, time.time())
loop.call_soon_threadsafe(lambda: print(queue.qsize()))
这是一个示例输出:
0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
如果您不想使用其他库,您可以从线程中安排协程。将 queue.put_nowait
替换为以下内容即可。
asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
变量loop
表示主线程中的事件循环。
编辑:
你的 async
协程没有做任何事情的原因是
事件循环从不给它机会这样做。队列对象是
不是线程安全的,如果你深入研究 cpython 代码,你会发现
这意味着 put_nowait
通过唤醒队列的消费者
在事件循环的 call_soon
方法中使用 future。如果
我们可以让它使用 call_soon_threadsafe
它应该可以工作。专业
然而,call_soon
和 call_soon_threadsafe
之间的区别是
call_soon_threadsafe
通过调用 loop._write_to_self()
唤醒事件循环。所以让我们自己称呼它:
import asyncio
import threading
queue = asyncio.Queue()
def threaded():
import time
while True:
time.sleep(2)
queue.put_nowait(time.time())
queue._loop._write_to_self()
print(queue.qsize())
@asyncio.coroutine
def async():
while True:
time = yield from queue.get()
print(time)
loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
然后,一切都按预期进行。
至于线程安全方面
访问共享对象,asyncio.queue
在幕后使用
collections.deque
具有线程安全 append
和 popleft
。
也许检查队列是否为空并且 popleft 不是原子的,但是如果
您仅在一个线程(事件循环之一)中使用队列
应该没问题。
其他提出的解决方案,loop.call_soon_threadsafe
来自 Huazuo
高的回答和我的asyncio.run_coroutine_threadsafe
只是在做
这,唤醒了事件循环。
只使用 threading.Lock 和 asyncio.Queue 怎么样?
class ThreadSafeAsyncFuture(asyncio.Future):
""" asyncio.Future is not thread-safe
"""
def set_result(self, result):
func = super().set_result
call = lambda: func(result)
self._loop.call_soon_threadsafe(call) # Warning: self._loop is undocumented
class ThreadSafeAsyncQueue(queue.Queue):
""" asyncio.Queue is not thread-safe, threading.Queue is not awaitable
works only with one putter to unlimited-size queue and with several getters
TODO: add maxsize limits
TODO: make put corouitine
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = threading.Lock()
self.loop = asyncio.get_event_loop()
self.waiters = []
def put(self, item):
with self.lock:
if self.waiters:
self.waiters.pop(0).set_result(item)
else:
super().put(item)
async def get(self):
with self.lock:
if not self.empty():
return super().get()
else:
fut = ThreadSafeAsyncFuture()
self.waiters.append(fut)
result = await fut
return result
另见 -