如何使用 urwid 和 asyncio 使长任务非阻塞?
How do I make a long task non-blocking with urwid and asyncio?
我正在编写一个 Python curses 应用程序,它通过进程发送和接收字符串来控制外部(Linux,如果有帮助的话)进程' stdin
和 stdout
,分别。界面使用urwid
。我已经写了一个 class 来控制外部进程和一些其他的一些 urwid 组件。
我还有一个按钮,可以向外部进程发送命令。然而,该进程不会立即响应,它的任务通常需要几秒钟,在此期间我希望界面不要冻结。
这是我 运行 子进程的方式:
def run(self, args):
import io, fcntl, os
from subprocess import Popen, PIPE
# Run wpa_cli with arguments, use a thread to feed the process with an input queue
self._pss = Popen(["wpa_cli"] + args, stdout=PIPE, stdin=PIPE)
self.stdout = io.TextIOWrapper(self._pss.stdout, encoding="utf-8")
self.stdin = io.TextIOWrapper(self._pss.stdin, encoding="utf-8", line_buffering=True)
# Make the process' stdout a non-blocking file
fd = self.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
...
我不得不使进程的输出流成为非阻塞的,以便能够解析它的输出。我不知道这对我的问题是否重要。
以下是我用来控制子进程输入和输出流的方法:
def read(self, parser=None, transform=None, sentinel='>'):
""" Reads from the controlled process' standard output until a sentinel
is found. Optionally execute a callable object with every line. Parsed
lines are placed in a list, which the function returns upon exiting. """
if not transform:
transform = lambda str: str
def readline():
return transform(self.stdout.readline().strip())
# Keep a list of (non-empty) parsed lines
items = []
for line in iter(readline, sentinel):
if callable(parser):
item = parser(line)
if item is not None:
items.append(item)
return items
def send(self, command, echo=True):
""" Sends a command to the controlled process. Action commands are
echoed to the standard output. Argument echo controls whether or not
they're removed by the reader function before parsing. """
print(command, file=self.stdin)
# Should we remove the echoed command?
if not echo:
self.read(sentinel=command)
我谈到的按钮只是从主脚本入口函数设置了回调。该回调应该向子进程发送命令并循环遍历生成的输出行,直到找到给定的文本,在这种情况下回调函数退出。在此之前,该过程会输出一些我需要捕捉并显示在用户界面中的有趣信息。
例如:
def button_callback():
# This is just an illustration
filter = re.compile('(event1|event2|...)')
def formatter(text):
try:
return re.search(filter, text).group(1)
except AttributeError:
return text
def parser(text):
if text == 'event1':
# Update the info Text accordingly
if text == 'event2':
# Update the info Text accordingly
controller.send('command')
controller.read(sentinel='beacon', parser=parser, transform=formatter)
需要注意的是:
read()
函数自旋(我找不到另一种方法),即使进程输出流是静默的,直到从(可选)解析的行中读取标记值,
- 直到按钮回调函数退出后,urwid 界面才会刷新,这会阻止
urwid
的主循环刷新屏幕。
我可以使用线程,但据我所知,urwid
支持 asyncio
,这就是我想要实现的。你可以说我笨,因为即使在浏览了 urwid
asyncio 示例并阅读了 Python asyncio
文档后,我也无法清楚地弄清楚是怎么回事。
鉴于有改变这些方法的空间,我仍然希望保持过程控制 class — 即包含 read()
和 send()
的过程 — 与可能。
到目前为止,我没有尝试在进程繁忙时更新界面。接收进程的组件 "notifications" 是一个普通的 urwid.Text()
小部件。
首先要做两件事:
你不一定需要 asyncio 来使用 urwid 做异步的事情,因为它已经有一个通常足够好的 simple event loop,它有处理许多 IO 场景的原语。
编写异步代码时,需要注意编写同步代码,就像那些循环直到找到哨兵值的函数一样,因为它会阻止任何其他代码(包括事件循环本身)执行: 这意味着 UI 将冻结直到该函数 returns
对于您的情况,您可以使用默认的简单事件循环并使用 MainLoop.watch_pipe
方法,该方法创建一个准备在子进程中使用的管道(已将其放入 async/non-blocking模式,顺便说一句 :)) 并在有新数据写入管道时调用回调。
这是一个使用它的简单示例,显示了 shell 命令的输出,同时保持 UI non-blocked(注意,因为懒惰而使用了一些全局变量):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import, division
import subprocess
import urwid
def show_or_exit(key):
if key in ('q', 'Q', 'esc'):
raise urwid.ExitMainLoop()
def update_text(read_data):
text.set_text(text.text + read_data)
def enter_idle():
loop.remove_watch_file(pipe.stdout)
if __name__ == '__main__':
widget = urwid.Pile([
urwid.Button('Here is a button'),
urwid.Button('And here another button'),
urwid.Button('One more, just to be sure'),
urwid.Button("Heck, let's add yet another one!"),
])
text = urwid.Text('PROCESS OUTPUT:\n')
widget = urwid.Columns([widget, text])
widget = urwid.Filler(widget, 'top')
loop = urwid.MainLoop(widget, unhandled_input=show_or_exit)
stdout = loop.watch_pipe(update_text)
stderr = loop.watch_pipe(update_text)
pipe = subprocess.Popen('for i in $(seq 50); do echo -n "$i "; sleep 0.5; done',
shell=True, stdout=stdout, stderr=stderr)
loop.run()
请注意回调 update_text
中的代码如何没有理由阻塞:它获取已读取的数据,更新组件,仅此而已。没有 while 循环等待其他事情发生。
在您的情况下,您可能需要调整解析 wpa_cli
输出的函数,以便它们也没有理由阻止。例如,与其在循环中等待直到找到值,他们可以设置一些变量或以其他方式发出信号,当他们找到或没有找到有趣的哨兵值时。
我希望这是有道理的,如果您需要澄清某些事情,请告诉我! :)
我正在编写一个 Python curses 应用程序,它通过进程发送和接收字符串来控制外部(Linux,如果有帮助的话)进程' stdin
和 stdout
,分别。界面使用urwid
。我已经写了一个 class 来控制外部进程和一些其他的一些 urwid 组件。
我还有一个按钮,可以向外部进程发送命令。然而,该进程不会立即响应,它的任务通常需要几秒钟,在此期间我希望界面不要冻结。
这是我 运行 子进程的方式:
def run(self, args):
import io, fcntl, os
from subprocess import Popen, PIPE
# Run wpa_cli with arguments, use a thread to feed the process with an input queue
self._pss = Popen(["wpa_cli"] + args, stdout=PIPE, stdin=PIPE)
self.stdout = io.TextIOWrapper(self._pss.stdout, encoding="utf-8")
self.stdin = io.TextIOWrapper(self._pss.stdin, encoding="utf-8", line_buffering=True)
# Make the process' stdout a non-blocking file
fd = self.stdout.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
...
我不得不使进程的输出流成为非阻塞的,以便能够解析它的输出。我不知道这对我的问题是否重要。
以下是我用来控制子进程输入和输出流的方法:
def read(self, parser=None, transform=None, sentinel='>'):
""" Reads from the controlled process' standard output until a sentinel
is found. Optionally execute a callable object with every line. Parsed
lines are placed in a list, which the function returns upon exiting. """
if not transform:
transform = lambda str: str
def readline():
return transform(self.stdout.readline().strip())
# Keep a list of (non-empty) parsed lines
items = []
for line in iter(readline, sentinel):
if callable(parser):
item = parser(line)
if item is not None:
items.append(item)
return items
def send(self, command, echo=True):
""" Sends a command to the controlled process. Action commands are
echoed to the standard output. Argument echo controls whether or not
they're removed by the reader function before parsing. """
print(command, file=self.stdin)
# Should we remove the echoed command?
if not echo:
self.read(sentinel=command)
我谈到的按钮只是从主脚本入口函数设置了回调。该回调应该向子进程发送命令并循环遍历生成的输出行,直到找到给定的文本,在这种情况下回调函数退出。在此之前,该过程会输出一些我需要捕捉并显示在用户界面中的有趣信息。
例如:
def button_callback():
# This is just an illustration
filter = re.compile('(event1|event2|...)')
def formatter(text):
try:
return re.search(filter, text).group(1)
except AttributeError:
return text
def parser(text):
if text == 'event1':
# Update the info Text accordingly
if text == 'event2':
# Update the info Text accordingly
controller.send('command')
controller.read(sentinel='beacon', parser=parser, transform=formatter)
需要注意的是:
read()
函数自旋(我找不到另一种方法),即使进程输出流是静默的,直到从(可选)解析的行中读取标记值,- 直到按钮回调函数退出后,urwid 界面才会刷新,这会阻止
urwid
的主循环刷新屏幕。
我可以使用线程,但据我所知,urwid
支持 asyncio
,这就是我想要实现的。你可以说我笨,因为即使在浏览了 urwid
asyncio 示例并阅读了 Python asyncio
文档后,我也无法清楚地弄清楚是怎么回事。
鉴于有改变这些方法的空间,我仍然希望保持过程控制 class — 即包含 read()
和 send()
的过程 — 与可能。
到目前为止,我没有尝试在进程繁忙时更新界面。接收进程的组件 "notifications" 是一个普通的 urwid.Text()
小部件。
首先要做两件事:
你不一定需要 asyncio 来使用 urwid 做异步的事情,因为它已经有一个通常足够好的 simple event loop,它有处理许多 IO 场景的原语。
编写异步代码时,需要注意编写同步代码,就像那些循环直到找到哨兵值的函数一样,因为它会阻止任何其他代码(包括事件循环本身)执行: 这意味着 UI 将冻结直到该函数 returns
对于您的情况,您可以使用默认的简单事件循环并使用 MainLoop.watch_pipe
方法,该方法创建一个准备在子进程中使用的管道(已将其放入 async/non-blocking模式,顺便说一句 :)) 并在有新数据写入管道时调用回调。
这是一个使用它的简单示例,显示了 shell 命令的输出,同时保持 UI non-blocked(注意,因为懒惰而使用了一些全局变量):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import, division
import subprocess
import urwid
def show_or_exit(key):
if key in ('q', 'Q', 'esc'):
raise urwid.ExitMainLoop()
def update_text(read_data):
text.set_text(text.text + read_data)
def enter_idle():
loop.remove_watch_file(pipe.stdout)
if __name__ == '__main__':
widget = urwid.Pile([
urwid.Button('Here is a button'),
urwid.Button('And here another button'),
urwid.Button('One more, just to be sure'),
urwid.Button("Heck, let's add yet another one!"),
])
text = urwid.Text('PROCESS OUTPUT:\n')
widget = urwid.Columns([widget, text])
widget = urwid.Filler(widget, 'top')
loop = urwid.MainLoop(widget, unhandled_input=show_or_exit)
stdout = loop.watch_pipe(update_text)
stderr = loop.watch_pipe(update_text)
pipe = subprocess.Popen('for i in $(seq 50); do echo -n "$i "; sleep 0.5; done',
shell=True, stdout=stdout, stderr=stderr)
loop.run()
请注意回调 update_text
中的代码如何没有理由阻塞:它获取已读取的数据,更新组件,仅此而已。没有 while 循环等待其他事情发生。
在您的情况下,您可能需要调整解析 wpa_cli
输出的函数,以便它们也没有理由阻止。例如,与其在循环中等待直到找到值,他们可以设置一些变量或以其他方式发出信号,当他们找到或没有找到有趣的哨兵值时。
我希望这是有道理的,如果您需要澄清某些事情,请告诉我! :)