用 Popen 模仿 glib.spawn_async…
Mimicing glib.spawn_async with Popen…
函数 glib.spawn_async 允许您挂接三个回调,这些回调在 stdout
、stderr
和进程完成时调用。
如何使用线程或 asyncio 模仿 subprocess 的相同功能?
我对功能比 threading/asynio 更感兴趣,但同时包含这两个功能的答案将获得赏金。
这是一个演示我想做什么的玩具程序:
import glib
import logging
import os
import gtk
class MySpawn(object):
def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
def execute(self, cmd, on_done, on_stdout, on_stderr):
self.pid, self.idin, self.idout, self.iderr = \
glib.spawn_async(cmd,
flags=glib.SPAWN_DO_NOT_REAP_CHILD,
standard_output=True,
standard_error=True)
fout = os.fdopen(self.idout, "r")
ferr = os.fdopen(self.iderr, "r")
glib.child_watch_add(self.pid, on_done)
glib.io_add_watch(fout, glib.IO_IN, on_stdout)
glib.io_add_watch(ferr, glib.IO_IN, on_stderr)
return self.pid
if __name__ == '__main__':
logging.basicConfig(format='%(thread)d %(levelname)s: %(message)s',
level=logging.DEBUG)
cmd = '/usr/bin/git ls-remote https://github.com/DiffSK/configobj'.split()
def on_done(pid, retval, *args):
logging.info("That's all folks!…")
def on_stdout(fobj, cond):
"""This blocks which is fine for this toy example…"""
for line in fobj.readlines():
logging.info(line.strip())
return True
def on_stderr(fobj, cond):
"""This blocks which is fine for this toy example…"""
for line in fobj.readlines():
logging.error(line.strip())
return True
runner = MySpawn()
runner.execute(cmd, on_done, on_stdout, on_stderr)
try:
gtk.main()
except KeyboardInterrupt:
print('')
我应该补充一点,因为 readlines()
是阻塞的,上面将缓冲所有输出并立即发送。如果这不是你想要的,那么你必须使用 readline()
并确保在命令结束时你读完了之前没有读过的所有行。
asyncio有subprocess_exec,完全不用subprocess模块:
import asyncio
class Handler(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
# fd == 1 for stdout, and 2 for stderr
print("Data from /bin/ls on fd %d: %s" % (fd, data.decode()))
def pipe_connection_lost(self, fd, exc):
print("Connection lost to /bin/ls")
def process_exited(self):
print("/bin/ls is finished.")
loop = asyncio.get_event_loop()
coro = loop.subprocess_exec(Handler, "/bin/ls", "/")
loop.run_until_complete(coro)
loop.close()
有了子进程和线程,也很简单。您可以只为每个管道生成一个线程,并为进程生成一个线程 wait()
:
import subprocess
import threading
class PopenWrapper(object):
def __init__(self, args):
self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL)
self.stdout_reader_thread = threading.Thread(target=self._reader, args=(self.process.stdout,))
self.stderr_reader_thread = threading.Thread(target=self._reader, args=(self.process.stderr,))
self.exit_watcher = threading.Thread(target=self._exit_watcher)
self.stdout_reader_thread.start()
self.stderr_reader_thread.start()
self.exit_watcher.start()
def _reader(self, fileobj):
for line in fileobj:
self.on_data(fileobj, line)
def _exit_watcher(self):
self.process.wait()
self.stdout_reader_thread.join()
self.stderr_reader_thread.join()
self.on_exit()
def on_data(self, fd, data):
return NotImplementedError
def on_exit(self):
return NotImplementedError
def join(self):
self.process.wait()
class LsWrapper(PopenWrapper):
def on_data(self, fd, data):
print("Received on fd %r: %s" % (fd, data))
def on_exit(self):
print("Process exited.")
LsWrapper(["/bin/ls", "/"]).join()
但是,请注意 glib 不会 使用线程异步执行回调。它使用事件循环,就像 asyncio 一样。这个想法是,在您的程序的核心是一个循环,该循环等待直到某些事情发生,然后同步执行关联的回调。在您的例子中,是 "data becomes available for reading on one of the pipes" 和 "the subprocess has exited"。通常,它还有 "the X11-server reported mouse movement"、"there's incoming network traffic" 等内容。您可以通过编写自己的事件循环来模拟 glib 的行为。在两个管道上使用 select
module。如果 select 报告管道可读,但 read
returns 没有数据,则进程可能已退出 - 在这种情况下调用子进程对象上的 poll()
方法来检查是否它已完成,如果有则调用您的退出回调,否则调用错误回调。
函数 glib.spawn_async 允许您挂接三个回调,这些回调在 stdout
、stderr
和进程完成时调用。
如何使用线程或 asyncio 模仿 subprocess 的相同功能?
我对功能比 threading/asynio 更感兴趣,但同时包含这两个功能的答案将获得赏金。
这是一个演示我想做什么的玩具程序:
import glib
import logging
import os
import gtk
class MySpawn(object):
def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
def execute(self, cmd, on_done, on_stdout, on_stderr):
self.pid, self.idin, self.idout, self.iderr = \
glib.spawn_async(cmd,
flags=glib.SPAWN_DO_NOT_REAP_CHILD,
standard_output=True,
standard_error=True)
fout = os.fdopen(self.idout, "r")
ferr = os.fdopen(self.iderr, "r")
glib.child_watch_add(self.pid, on_done)
glib.io_add_watch(fout, glib.IO_IN, on_stdout)
glib.io_add_watch(ferr, glib.IO_IN, on_stderr)
return self.pid
if __name__ == '__main__':
logging.basicConfig(format='%(thread)d %(levelname)s: %(message)s',
level=logging.DEBUG)
cmd = '/usr/bin/git ls-remote https://github.com/DiffSK/configobj'.split()
def on_done(pid, retval, *args):
logging.info("That's all folks!…")
def on_stdout(fobj, cond):
"""This blocks which is fine for this toy example…"""
for line in fobj.readlines():
logging.info(line.strip())
return True
def on_stderr(fobj, cond):
"""This blocks which is fine for this toy example…"""
for line in fobj.readlines():
logging.error(line.strip())
return True
runner = MySpawn()
runner.execute(cmd, on_done, on_stdout, on_stderr)
try:
gtk.main()
except KeyboardInterrupt:
print('')
我应该补充一点,因为 readlines()
是阻塞的,上面将缓冲所有输出并立即发送。如果这不是你想要的,那么你必须使用 readline()
并确保在命令结束时你读完了之前没有读过的所有行。
asyncio有subprocess_exec,完全不用subprocess模块:
import asyncio
class Handler(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
# fd == 1 for stdout, and 2 for stderr
print("Data from /bin/ls on fd %d: %s" % (fd, data.decode()))
def pipe_connection_lost(self, fd, exc):
print("Connection lost to /bin/ls")
def process_exited(self):
print("/bin/ls is finished.")
loop = asyncio.get_event_loop()
coro = loop.subprocess_exec(Handler, "/bin/ls", "/")
loop.run_until_complete(coro)
loop.close()
有了子进程和线程,也很简单。您可以只为每个管道生成一个线程,并为进程生成一个线程 wait()
:
import subprocess
import threading
class PopenWrapper(object):
def __init__(self, args):
self.process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL)
self.stdout_reader_thread = threading.Thread(target=self._reader, args=(self.process.stdout,))
self.stderr_reader_thread = threading.Thread(target=self._reader, args=(self.process.stderr,))
self.exit_watcher = threading.Thread(target=self._exit_watcher)
self.stdout_reader_thread.start()
self.stderr_reader_thread.start()
self.exit_watcher.start()
def _reader(self, fileobj):
for line in fileobj:
self.on_data(fileobj, line)
def _exit_watcher(self):
self.process.wait()
self.stdout_reader_thread.join()
self.stderr_reader_thread.join()
self.on_exit()
def on_data(self, fd, data):
return NotImplementedError
def on_exit(self):
return NotImplementedError
def join(self):
self.process.wait()
class LsWrapper(PopenWrapper):
def on_data(self, fd, data):
print("Received on fd %r: %s" % (fd, data))
def on_exit(self):
print("Process exited.")
LsWrapper(["/bin/ls", "/"]).join()
但是,请注意 glib 不会 使用线程异步执行回调。它使用事件循环,就像 asyncio 一样。这个想法是,在您的程序的核心是一个循环,该循环等待直到某些事情发生,然后同步执行关联的回调。在您的例子中,是 "data becomes available for reading on one of the pipes" 和 "the subprocess has exited"。通常,它还有 "the X11-server reported mouse movement"、"there's incoming network traffic" 等内容。您可以通过编写自己的事件循环来模拟 glib 的行为。在两个管道上使用 select
module。如果 select 报告管道可读,但 read
returns 没有数据,则进程可能已退出 - 在这种情况下调用子进程对象上的 poll()
方法来检查是否它已完成,如果有则调用您的退出回调,否则调用错误回调。