Python 在异步上下文中对标准输入进行非阻塞读取

Python non blocking read on stdin in an asynchronous context

我正在尝试构建一个异步运行的脚本,同时具有基于来自标准输入的用户输入的 start/stop 机制。

我创建了两个线程,一个用于异步工作,一个用于从标准输入读取。我的想法是,程序一直运行到用户在标准输入中键入“停止”,异步任务一直等到用户在标准输入中键入“开始”。

这是我当前的代码:

class DataManager(metaclass=Singleton):

    def __init__(self):
        self.flag = threading.Event() 
        self.flag.set()
        
        # Thread for reading user input
        self.pool_thread = threading.Thread(target=self.__pool_input())
        self.pool_thread.daemon = True
        self.pool_thread.start()

    # Method to create thread for asynchronous tasks
    def start(self):
        if self.flag.is_set():
            self.flag.clear()
            self.main_thread = threading.Thread(target=self.__main_wrapper)
            self.main_thread.start()
   
    # Method for reading user stdin
    def __pool_input(self):
        val = input()

        if val == "stop":
            self.stop()

        elif val == "start":
            self.start()

    # Wrapper method to start the event loop and call async context
    def __main_wrapper(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self.__main())
        loop.close()

    # Main async loop
    async def __main(self):
        while not self.flag.is_set():
            # Run async stuff
            # ...
            print('Thread is running')
            await asyncio.sleep(5)
     
    # Stop the main async loop on user command   
    def stop(self):
        if not self.flag.is_set():
            self.flag.set()


if __name__ == "__main__":
    data_manager = DataManager()
    data_manager.start()

预期行为

当前行为

除了必须以某种方式保持 __pool_input 线程处于活动状态(因为一旦线程结束,它就会读取输入,而我再也不会启动它),我不知道如何实现预期的结果.

您的程序无法运行,因为您没有将 __pool_input 作为 target 传递,而是调用它。使用

self.pool_thread = threading.Thread(target=self.__pool_input)

我想它应该可以工作。一次。正如你所说。

您可以在 __pool_input() 中创建一个无限循环,就像在 __main_wrapper() 中一样,以再次从标准输入读取。

话虽这么说,我认为你应该改变你的设计。异步代码的伟大之处在于你不需要 线程(对于任何 I/O)线程非常困难。因此,最好尽可能避免使用它们。最近有一个关于 async input 的类似 question。也许你会在那里找到你喜欢的东西。