asyncio:等待来自其他线程的事件
asyncio: Wait for event from other thread
我正在 Python 中设计一个应用程序,它应该访问一台机器来执行一些(冗长的)任务。 asyncio 模块似乎是所有与网络相关的东西的不错选择,但现在我需要访问一个特定组件的串行端口。我已经为实际的串行端口实现了一种抽象层,但无法弄清楚如何将它与 asyncio 合理地集成。
以下设置:我有一个线程 运行 一个循环,它定期与机器对话并解码响应。使用方法 enqueue_query()
,我可以将查询字符串放入队列中,然后由其他线程将其发送到机器并引起响应。通过传入 threading.Event
(或带有 set()
方法的任何内容),调用者可以执行阻塞等待响应。这看起来像这样:
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())
我现在的目标是将这些行放入协同程序中,并让 asyncio 处理这种等待,以便应用程序可以同时做其他事情。我怎么能这样做?它可能会通过将 f.wait()
包装到一个执行器中来工作,但这似乎有点愚蠢,因为这会创建一个新线程只等待另一个线程做某事。
最简单的方法就是完全按照您的建议进行 - 在执行程序中包装对 f.wait()
的调用:
@asyncio.coroutine
def do_enqueue():
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
yield from loop.run_in_executor(None, f.wait)
print(ch.get_query_responses())
您确实会产生启动线程池的开销(至少对于第一次调用,线程池将从那时起保留在内存中),但是任何提供类似 [=12= 的实现的解决方案] 使用线程安全的阻塞和非阻塞 API,在不依赖任何内部后台线程的情况下,工作量会大很多。
By passing in a threading.Event
(or anything with a set()
method), the caller can perform a blocking wait for the response.
鉴于查询函数的上述行为,您所需要的只是 asyncio.Event
的线程安全版本。仅需 3 行代码:
import asyncio
class Event_ts(asyncio.Event):
#TODO: clear() method
def set(self):
#FIXME: The _loop attribute is not documented as public api!
self._loop.call_soon_threadsafe(super().set)
功能测试:
def threaded(event):
import time
while True:
event.set()
time.sleep(1)
async def main():
import threading
e = Event_ts()
threading.Thread(target=threaded, args=(e,)).start()
while True:
await e.wait()
e.clear()
print('whatever')
asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()
Huazuo Gao 的回答中的 class Event_ts
在 Python 3.9 及以下版本中效果很好,但在 3.10 中效果不佳。这是因为在 Python 3.10 中私有属性 _loop
最初是 None
.
以下代码适用于 Python 3.10 以及 3.9 及以下版本。 (我也添加了 clear()
方法。)
import asyncio
class Event_ts(asyncio.Event):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._loop is None:
self._loop = asyncio.get_event_loop()
def set(self):
self._loop.call_soon_threadsafe(super().set)
def clear(self):
self._loop.call_soon_threadsafe(super().clear)
我正在 Python 中设计一个应用程序,它应该访问一台机器来执行一些(冗长的)任务。 asyncio 模块似乎是所有与网络相关的东西的不错选择,但现在我需要访问一个特定组件的串行端口。我已经为实际的串行端口实现了一种抽象层,但无法弄清楚如何将它与 asyncio 合理地集成。
以下设置:我有一个线程 运行 一个循环,它定期与机器对话并解码响应。使用方法 enqueue_query()
,我可以将查询字符串放入队列中,然后由其他线程将其发送到机器并引起响应。通过传入 threading.Event
(或带有 set()
方法的任何内容),调用者可以执行阻塞等待响应。这看起来像这样:
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())
我现在的目标是将这些行放入协同程序中,并让 asyncio 处理这种等待,以便应用程序可以同时做其他事情。我怎么能这样做?它可能会通过将 f.wait()
包装到一个执行器中来工作,但这似乎有点愚蠢,因为这会创建一个新线程只等待另一个线程做某事。
最简单的方法就是完全按照您的建议进行 - 在执行程序中包装对 f.wait()
的调用:
@asyncio.coroutine
def do_enqueue():
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
yield from loop.run_in_executor(None, f.wait)
print(ch.get_query_responses())
您确实会产生启动线程池的开销(至少对于第一次调用,线程池将从那时起保留在内存中),但是任何提供类似 [=12= 的实现的解决方案] 使用线程安全的阻塞和非阻塞 API,在不依赖任何内部后台线程的情况下,工作量会大很多。
By passing in a
threading.Event
(or anything with aset()
method), the caller can perform a blocking wait for the response.
鉴于查询函数的上述行为,您所需要的只是 asyncio.Event
的线程安全版本。仅需 3 行代码:
import asyncio
class Event_ts(asyncio.Event):
#TODO: clear() method
def set(self):
#FIXME: The _loop attribute is not documented as public api!
self._loop.call_soon_threadsafe(super().set)
功能测试:
def threaded(event):
import time
while True:
event.set()
time.sleep(1)
async def main():
import threading
e = Event_ts()
threading.Thread(target=threaded, args=(e,)).start()
while True:
await e.wait()
e.clear()
print('whatever')
asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()
Huazuo Gao 的回答中的 class Event_ts
在 Python 3.9 及以下版本中效果很好,但在 3.10 中效果不佳。这是因为在 Python 3.10 中私有属性 _loop
最初是 None
.
以下代码适用于 Python 3.10 以及 3.9 及以下版本。 (我也添加了 clear()
方法。)
import asyncio
class Event_ts(asyncio.Event):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._loop is None:
self._loop = asyncio.get_event_loop()
def set(self):
self._loop.call_soon_threadsafe(super().set)
def clear(self):
self._loop.call_soon_threadsafe(super().clear)