Python 在异步上下文中对标准输入进行非阻塞读取
Python non blocking read on stdin in an asynchronous context
我正在尝试构建一个异步运行的脚本,同时具有基于来自标准输入的用户输入的 start/stop 机制。
我创建了两个线程,一个用于异步工作,一个用于从标准输入读取。我的想法是,程序一直运行到用户在标准输入中键入“停止”,异步任务一直等到用户在标准输入中键入“开始”。
这是我当前的代码:
class DataManager(metaclass=Singleton):
def __init__(self):
self.flag = threading.Event()
self.flag.set()
# Thread for reading user input
self.pool_thread = threading.Thread(target=self.__pool_input())
self.pool_thread.daemon = True
self.pool_thread.start()
# Method to create thread for asynchronous tasks
def start(self):
if self.flag.is_set():
self.flag.clear()
self.main_thread = threading.Thread(target=self.__main_wrapper)
self.main_thread.start()
# Method for reading user stdin
def __pool_input(self):
val = input()
if val == "stop":
self.stop()
elif val == "start":
self.start()
# Wrapper method to start the event loop and call async context
def __main_wrapper(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.__main())
loop.close()
# Main async loop
async def __main(self):
while not self.flag.is_set():
# Run async stuff
# ...
print('Thread is running')
await asyncio.sleep(5)
# Stop the main async loop on user command
def stop(self):
if not self.flag.is_set():
self.flag.set()
if __name__ == "__main__":
data_manager = DataManager()
data_manager.start()
预期行为
- 异步(主)线程循环运行
- 用户输入“停止”,异步线程停止
- 用户键入“开始”,异步循环再次运行
当前行为
- 异步线程被阻塞,直到用户在标准输入上输入
- 异步线程启动运行
- Stdin 在异步线程运行时被阻塞
除了必须以某种方式保持 __pool_input
线程处于活动状态(因为一旦线程结束,它就会读取输入,而我再也不会启动它),我不知道如何实现预期的结果.
您的程序无法运行,因为您没有将 __pool_input
作为 target
传递,而是调用它。使用
self.pool_thread = threading.Thread(target=self.__pool_input)
我想它应该可以工作。一次。正如你所说。
您可以在 __pool_input()
中创建一个无限循环,就像在 __main_wrapper()
中一样,以再次从标准输入读取。
话虽这么说,我认为你应该改变你的设计。异步代码的伟大之处在于你不需要 线程(对于任何 I/O)线程非常困难。因此,最好尽可能避免使用它们。最近有一个关于 async input
的类似 question。也许你会在那里找到你喜欢的东西。
我正在尝试构建一个异步运行的脚本,同时具有基于来自标准输入的用户输入的 start/stop 机制。
我创建了两个线程,一个用于异步工作,一个用于从标准输入读取。我的想法是,程序一直运行到用户在标准输入中键入“停止”,异步任务一直等到用户在标准输入中键入“开始”。
这是我当前的代码:
class DataManager(metaclass=Singleton):
def __init__(self):
self.flag = threading.Event()
self.flag.set()
# Thread for reading user input
self.pool_thread = threading.Thread(target=self.__pool_input())
self.pool_thread.daemon = True
self.pool_thread.start()
# Method to create thread for asynchronous tasks
def start(self):
if self.flag.is_set():
self.flag.clear()
self.main_thread = threading.Thread(target=self.__main_wrapper)
self.main_thread.start()
# Method for reading user stdin
def __pool_input(self):
val = input()
if val == "stop":
self.stop()
elif val == "start":
self.start()
# Wrapper method to start the event loop and call async context
def __main_wrapper(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.__main())
loop.close()
# Main async loop
async def __main(self):
while not self.flag.is_set():
# Run async stuff
# ...
print('Thread is running')
await asyncio.sleep(5)
# Stop the main async loop on user command
def stop(self):
if not self.flag.is_set():
self.flag.set()
if __name__ == "__main__":
data_manager = DataManager()
data_manager.start()
预期行为
- 异步(主)线程循环运行
- 用户输入“停止”,异步线程停止
- 用户键入“开始”,异步循环再次运行
当前行为
- 异步线程被阻塞,直到用户在标准输入上输入
- 异步线程启动运行
- Stdin 在异步线程运行时被阻塞
除了必须以某种方式保持 __pool_input
线程处于活动状态(因为一旦线程结束,它就会读取输入,而我再也不会启动它),我不知道如何实现预期的结果.
您的程序无法运行,因为您没有将 __pool_input
作为 target
传递,而是调用它。使用
self.pool_thread = threading.Thread(target=self.__pool_input)
我想它应该可以工作。一次。正如你所说。
您可以在 __pool_input()
中创建一个无限循环,就像在 __main_wrapper()
中一样,以再次从标准输入读取。
话虽这么说,我认为你应该改变你的设计。异步代码的伟大之处在于你不需要 线程(对于任何 I/O)线程非常困难。因此,最好尽可能避免使用它们。最近有一个关于 async input
的类似 question。也许你会在那里找到你喜欢的东西。