确定自进程上次输出以来的时间 - 使用 subprocess.Popen
Determining time since process's last output - with subprocess.Popen
我正在为测试套件中的进程编写某种看门狗。我需要确定测试是否挂起。
我可以简单地用 subprocess.Popen(...)
开始这个过程,然后使用 Popen.wait(timeout=to)
或 Popen.poll()
并保留我自己的计时器。然而,这些测试在执行时间上差异很大,这使得不可能有一个对所有测试都有意义的好的 'timeout' 值。
我发现确定测试是否挂起的一个好方法是 'timeout' 最后一次进程输出任何内容。为此,我考虑使用
process = subprocess.Popen(args='<program>', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ...)
和Popen.communicate()
,判断什么时候stdout
and/orstderr
不是None
。问题是 Popen.communicate()
,没有 'timeout' 只会等到进程终止,而 'timeout' 会引发 TimeoutExpired
异常,我无法确定如果有任何内容被读取。 TimeoutExpired.output
是空的,顺便说一句。
我在文档中找不到任何允许手动执行 'reads' 的内容。此外,该过程通常有很多输出,因此以 stdout=<open_file_descriptor>
启动它会有好处,因为我不担心管道缓冲区溢出。
Update/Solution:
Popen.stdout
和Popen.stderr
return一个"readable stream object",哪个可以用来手动poll/select和阅读。我最终使用 select 'Polling Objects',它使用 poll()
系统调用,如下所示:
import os
import select
import subprocess
p = subprocess.Popen(args="<program>", shell=True, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poll_obj = select.poll()
poll_obj.register(p.stdout, select.POLLIN)
poll_obj.register(p.stderr, select.POLLIN)
while p.poll() is None:
events = True
while events:
events = poll_obj.poll(10)
for fd, event in events:
if event & select.POLLIN:
print("STDOUT: " if fd == p.stdout.fileno() else "STDERR: ")
print(os.read(fd, 1024).decode())
# else some other error (see 'Polling Objects')
这有点被覆盖了here..
基本上你需要使用 select()
来轮询 fd 以查看它们是否有输入:
#!/usr/bin/python
import fcntl import os import select import subprocess
def setnonblocking(fd):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return fd
p = subprocess.Popen("/bin/sh -c 'c=10; while [ $c -gt 0 ]; do echo $c hello; sleep 1; >&2 echo world; sleep 1; let c=$c-1; done'", stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
process_fds = map(setnonblocking, [p.stdout, p.stderr])
while process_fds:
readable, writable, exceptional = select.select(process_fds, [], process_fds, 100)
print "Select: ", readable, writable, exceptional
print "Exitcode: ", p.poll()
for fd in readable:
data = os.read(fd.fileno(), 1024)
if data == "": # EOF
process_fds.remove(fd)
continue
if fd == p.stdout:
print "STDOUT: ",
if fd == p.stderr:
print "STDERR: ",
print data,
for fd in exceptional:
process_fds.remove(fd)
输出:
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
Exitcode: None
STDOUT: 10 hello
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: None
STDERR: world
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
Exitcode: None
STDOUT: 9 hello
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: None
[...]
STDOUT: 1 hello
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: None
STDERR: world
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>, <open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: 1
使用 os.read()
而不是 fd.read()
因为您需要以非面向行的方式阅读。 fd.read()
等到找到换行符——但你可能会阻塞。使用此方法,您还可以拆分 stderr
和 stdout
.
编辑:已修改以处理在 p.stdout
和 p.stderr
的 EOF
之前退出的进程
以下是在 Python 3 中在 Unix 上实现 "timeout since the subprocess' last output" 的方法:
#!/usr/bin/env python3
import os
import selectors
import sys
from subprocess import Popen, PIPE, _PopenSelector as Selector
timeout = 1 # seconds
with Popen([sys.executable, '-c', '''import time
for i in range(10): # dummy script
time.sleep(i)
print(i, flush=True)
'''], stdout=PIPE, stderr=PIPE) as process:
pipes = {process.stdout: 1, process.stderr: 2} # where to echo data
with Selector() as sel:
for pipe in pipes:
os.set_blocking(pipe.fileno(), False)
sel.register(pipe, selectors.EVENT_READ)
while pipes:
events = sel.select(timeout)
if not events: # timeout
process.kill()
for key, mask in events:
assert mask == selectors.EVENT_READ
data = os.read(key.fd, 512)
if data == b'': # EOF
sel.unregister(key.fileobj)
del pipes[key.fileobj]
else: # echo data
os.write(pipes[key.fileobj], data)
注意:循环没有在 process.poll()
终止——没有数据丢失。该代码使用 subprocess
作者喜欢的相同选择器,否则可以使用 sel = selectors.DefaultSelector()
。如果孙子进程可能会继承管道,那么您应该更积极地打破超时循环(). To implement os.set_blocking()
before Python 3.5, you could use fcntl
:
from fcntl import fcntl, F_GETFL, F_SETFL
def set_nonblocking(fd):
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK) # set O_NONBLOCK
我正在为测试套件中的进程编写某种看门狗。我需要确定测试是否挂起。
我可以简单地用 subprocess.Popen(...)
开始这个过程,然后使用 Popen.wait(timeout=to)
或 Popen.poll()
并保留我自己的计时器。然而,这些测试在执行时间上差异很大,这使得不可能有一个对所有测试都有意义的好的 'timeout' 值。
我发现确定测试是否挂起的一个好方法是 'timeout' 最后一次进程输出任何内容。为此,我考虑使用
process = subprocess.Popen(args='<program>', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ...)
和Popen.communicate()
,判断什么时候stdout
and/orstderr
不是None
。问题是 Popen.communicate()
,没有 'timeout' 只会等到进程终止,而 'timeout' 会引发 TimeoutExpired
异常,我无法确定如果有任何内容被读取。 TimeoutExpired.output
是空的,顺便说一句。
我在文档中找不到任何允许手动执行 'reads' 的内容。此外,该过程通常有很多输出,因此以 stdout=<open_file_descriptor>
启动它会有好处,因为我不担心管道缓冲区溢出。
Update/Solution:
Popen.stdout
和Popen.stderr
return一个"readable stream object",哪个可以用来手动poll/select和阅读。我最终使用 select 'Polling Objects',它使用 poll()
系统调用,如下所示:
import os
import select
import subprocess
p = subprocess.Popen(args="<program>", shell=True, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poll_obj = select.poll()
poll_obj.register(p.stdout, select.POLLIN)
poll_obj.register(p.stderr, select.POLLIN)
while p.poll() is None:
events = True
while events:
events = poll_obj.poll(10)
for fd, event in events:
if event & select.POLLIN:
print("STDOUT: " if fd == p.stdout.fileno() else "STDERR: ")
print(os.read(fd, 1024).decode())
# else some other error (see 'Polling Objects')
这有点被覆盖了here..
基本上你需要使用 select()
来轮询 fd 以查看它们是否有输入:
#!/usr/bin/python
import fcntl import os import select import subprocess
def setnonblocking(fd):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return fd
p = subprocess.Popen("/bin/sh -c 'c=10; while [ $c -gt 0 ]; do echo $c hello; sleep 1; >&2 echo world; sleep 1; let c=$c-1; done'", stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
process_fds = map(setnonblocking, [p.stdout, p.stderr])
while process_fds:
readable, writable, exceptional = select.select(process_fds, [], process_fds, 100)
print "Select: ", readable, writable, exceptional
print "Exitcode: ", p.poll()
for fd in readable:
data = os.read(fd.fileno(), 1024)
if data == "": # EOF
process_fds.remove(fd)
continue
if fd == p.stdout:
print "STDOUT: ",
if fd == p.stderr:
print "STDERR: ",
print data,
for fd in exceptional:
process_fds.remove(fd)
输出:
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
Exitcode: None
STDOUT: 10 hello
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: None
STDERR: world
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
Exitcode: None
STDOUT: 9 hello
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: None
[...]
STDOUT: 1 hello
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: None
STDERR: world
Select: [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>, <open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode: 1
使用 os.read()
而不是 fd.read()
因为您需要以非面向行的方式阅读。 fd.read()
等到找到换行符——但你可能会阻塞。使用此方法,您还可以拆分 stderr
和 stdout
.
编辑:已修改以处理在 p.stdout
和 p.stderr
EOF
之前退出的进程
以下是在 Python 3 中在 Unix 上实现 "timeout since the subprocess' last output" 的方法:
#!/usr/bin/env python3
import os
import selectors
import sys
from subprocess import Popen, PIPE, _PopenSelector as Selector
timeout = 1 # seconds
with Popen([sys.executable, '-c', '''import time
for i in range(10): # dummy script
time.sleep(i)
print(i, flush=True)
'''], stdout=PIPE, stderr=PIPE) as process:
pipes = {process.stdout: 1, process.stderr: 2} # where to echo data
with Selector() as sel:
for pipe in pipes:
os.set_blocking(pipe.fileno(), False)
sel.register(pipe, selectors.EVENT_READ)
while pipes:
events = sel.select(timeout)
if not events: # timeout
process.kill()
for key, mask in events:
assert mask == selectors.EVENT_READ
data = os.read(key.fd, 512)
if data == b'': # EOF
sel.unregister(key.fileobj)
del pipes[key.fileobj]
else: # echo data
os.write(pipes[key.fileobj], data)
注意:循环没有在 process.poll()
终止——没有数据丢失。该代码使用 subprocess
作者喜欢的相同选择器,否则可以使用 sel = selectors.DefaultSelector()
。如果孙子进程可能会继承管道,那么您应该更积极地打破超时循环(os.set_blocking()
before Python 3.5, you could use fcntl
:
from fcntl import fcntl, F_GETFL, F_SETFL
def set_nonblocking(fd):
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK) # set O_NONBLOCK