如何使子进程只通信错误
How to make subprocess only communicate error
我们创建了一个在许多项目中使用的商品函数,它使用子进程来启动一个命令。这个函数如下:
def _popen( command_list ):
p = subprocess.Popen( command_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE )
out, error_msg = p.communicate()
# Some processes (e.g. system_start) print a number of dots in stderr
# even when no error occurs.
if error_msg.strip('.') == '':
error_msg = ''
return out, error_msg
对于大多数进程,这按预期工作。
但现在我必须将它与需要保持 运行 的后台进程一起使用,只要我的 python-脚本也是 运行,因此现在很有趣开始 ;-).
注意:脚本还需要使用相同的 _popen 函数启动其他非后台进程。
我知道通过跳过 p.communicate 我可以让进程在后台启动,而我的 python 脚本继续。
但这有两个问题:
- 我需要检查后台进程是否正确启动
- 虽然主进程是运行我需要不时检查后台进程的stdout和stderr,不停止进程/结束挂在后台进程
检查后台进程是否正确启动
对于 1,我目前修改了 _popen 版本以采用额外参数 'skip_com'(默认为 False)来跳过 p.communicate 调用。在那种情况下,我 return p-object i.s.o。出和 error_msg。
这样我就可以在启动后直接检查进程是否为 运行,如果不是,则调用 p 对象上的通信来检查 error_msg 是什么。
MY_COMMAND_LIST = [ "<command that should go to background>" ]
def _popen( command_list, skip_com=False ):
p = subprocess.Popen( command_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE )
if not skip_com:
out, error_msg = p.communicate()
# Some processes (e.g. system_start) print a number of dots in stderr
# even when no error occurs.
if error_msg.strip('.') == '':
error_msg = ''
return out, error_msg
else:
return p
...
p = _popen( MY_COMMAND_LIST, True )
error = _get_command_pid( MY_COMMAND_LIST ) # checks if background command is running using _popen and ps -ef
if error:
_, error_msg = p.communicate()
不知道有没有更好的办法
检查标准输出/标准错误
对于2我还没有找到不会导致脚本等待后台进程结束的解决方案。
我知道的唯一交流方式是在例如 iter 上使用p.stdout.readline。但是如果进程仍然是 运行:
就会挂起
for line in iter( p.stdout.readline, "" ): print line
有人知道怎么做吗?
/edit/ 我需要分别检查从 stdout 和 stderr 获得的数据。在这种情况下,stderr 尤其重要,因为如果后台进程遇到错误,它将退出,我需要在我的主程序中捕获它,以防止由该退出引起的错误。
在某些情况下需要 stdout 输出来检查后台进程中的预期行为并对此做出反应。
它可能比你想象的更好或更坏...
无论如何,逐行读取管道的正确方法很简单:
for line in p.stdout:
#process line is you want of just
print line
或者如果您需要在更高级别的循环中处理它
line = next(p.stdout)
但是从 Python 开始的命令可能会带来更棘手的问题。许多程序使用底层 C 标准库,默认情况下 stdout 是缓冲流。系统检测标准输出是否连接到终端,并自动刷新新行(\n
)或同一终端上的读取输出。但是如果输出连接到管道或文件,所有内容都会被缓冲直到缓冲区已满,这在当前系统上需要几 kBytes。在那种情况下,在 Python 级别 什么也做不了。上面的代码一旦写在管道上就会得到一个完整的行,但是在被调用者实际写了一些东西之前无法猜测...
更新
The subprocess will actually exit if it encounters an error
如果您不需要读取输出来检测错误,那么 redirect it to DEVNULL
并调用 .poll()
不时地检查子进程的状态 而无需停止进程.
假设您必须读取输出:
不要使用 stdout=PIPE, stderr=PIPE 除非你从管道读取。 否则,子进程可能挂起一旦任何相应的 OS 管道缓冲区填满。
如果您想启动一个进程并在它处于 运行 时执行其他操作,那么您需要 a non-blocking way to read its output。一种简单的可移植方法是使用线程:
def process_output(process):
with finishing(process): # close pipes, call .wait()
for line in iter(process.stdout.readline, b''):
if detected_error(line):
communicate_error(process, line)
process = Popen(command, stdout=PIPE, stderr=STDOUT, bufsize=1)
Thread(target=process_output, args=[process]).start()
I need to check the data I get from stdout and stderr seperately.
使用两个线程:
def read_stdout(process):
with waiting(process), process.stdout: # close pipe, call .wait()
for line in iter(process.stdout.readline, b''):
do_something_with_stdout(line)
def read_stderr(process):
with process.stderr:
for line in iter(process.stderr.readline, b''):
if detected_error(line):
communicate_error(process, line)
process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
Thread(target=read_stdout, args=[process]).start()
Thread(target=read_stderr, args=[process]).start()
您可以将代码放入自定义 class(对 do_something_with_stdout()
、detected_error()
、communicate_error()
方法进行分组)。
我们创建了一个在许多项目中使用的商品函数,它使用子进程来启动一个命令。这个函数如下:
def _popen( command_list ):
p = subprocess.Popen( command_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE )
out, error_msg = p.communicate()
# Some processes (e.g. system_start) print a number of dots in stderr
# even when no error occurs.
if error_msg.strip('.') == '':
error_msg = ''
return out, error_msg
对于大多数进程,这按预期工作。
但现在我必须将它与需要保持 运行 的后台进程一起使用,只要我的 python-脚本也是 运行,因此现在很有趣开始 ;-).
注意:脚本还需要使用相同的 _popen 函数启动其他非后台进程。
我知道通过跳过 p.communicate 我可以让进程在后台启动,而我的 python 脚本继续。
但这有两个问题:
- 我需要检查后台进程是否正确启动
- 虽然主进程是运行我需要不时检查后台进程的stdout和stderr,不停止进程/结束挂在后台进程
检查后台进程是否正确启动
对于 1,我目前修改了 _popen 版本以采用额外参数 'skip_com'(默认为 False)来跳过 p.communicate 调用。在那种情况下,我 return p-object i.s.o。出和 error_msg。
这样我就可以在启动后直接检查进程是否为 运行,如果不是,则调用 p 对象上的通信来检查 error_msg 是什么。
MY_COMMAND_LIST = [ "<command that should go to background>" ]
def _popen( command_list, skip_com=False ):
p = subprocess.Popen( command_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE )
if not skip_com:
out, error_msg = p.communicate()
# Some processes (e.g. system_start) print a number of dots in stderr
# even when no error occurs.
if error_msg.strip('.') == '':
error_msg = ''
return out, error_msg
else:
return p
...
p = _popen( MY_COMMAND_LIST, True )
error = _get_command_pid( MY_COMMAND_LIST ) # checks if background command is running using _popen and ps -ef
if error:
_, error_msg = p.communicate()
不知道有没有更好的办法
检查标准输出/标准错误
对于2我还没有找到不会导致脚本等待后台进程结束的解决方案。
我知道的唯一交流方式是在例如 iter 上使用p.stdout.readline。但是如果进程仍然是 运行:
for line in iter( p.stdout.readline, "" ): print line
有人知道怎么做吗?
/edit/ 我需要分别检查从 stdout 和 stderr 获得的数据。在这种情况下,stderr 尤其重要,因为如果后台进程遇到错误,它将退出,我需要在我的主程序中捕获它,以防止由该退出引起的错误。
在某些情况下需要 stdout 输出来检查后台进程中的预期行为并对此做出反应。
它可能比你想象的更好或更坏...
无论如何,逐行读取管道的正确方法很简单:
for line in p.stdout:
#process line is you want of just
print line
或者如果您需要在更高级别的循环中处理它
line = next(p.stdout)
但是从 Python 开始的命令可能会带来更棘手的问题。许多程序使用底层 C 标准库,默认情况下 stdout 是缓冲流。系统检测标准输出是否连接到终端,并自动刷新新行(\n
)或同一终端上的读取输出。但是如果输出连接到管道或文件,所有内容都会被缓冲直到缓冲区已满,这在当前系统上需要几 kBytes。在那种情况下,在 Python 级别 什么也做不了。上面的代码一旦写在管道上就会得到一个完整的行,但是在被调用者实际写了一些东西之前无法猜测...
更新
The subprocess will actually exit if it encounters an error
如果您不需要读取输出来检测错误,那么 redirect it to DEVNULL
并调用 .poll()
不时地检查子进程的状态 而无需停止进程.
假设您必须读取输出:
不要使用 stdout=PIPE, stderr=PIPE 除非你从管道读取。 否则,子进程可能挂起一旦任何相应的 OS 管道缓冲区填满。
如果您想启动一个进程并在它处于 运行 时执行其他操作,那么您需要 a non-blocking way to read its output。一种简单的可移植方法是使用线程:
def process_output(process):
with finishing(process): # close pipes, call .wait()
for line in iter(process.stdout.readline, b''):
if detected_error(line):
communicate_error(process, line)
process = Popen(command, stdout=PIPE, stderr=STDOUT, bufsize=1)
Thread(target=process_output, args=[process]).start()
I need to check the data I get from stdout and stderr seperately.
使用两个线程:
def read_stdout(process):
with waiting(process), process.stdout: # close pipe, call .wait()
for line in iter(process.stdout.readline, b''):
do_something_with_stdout(line)
def read_stderr(process):
with process.stderr:
for line in iter(process.stderr.readline, b''):
if detected_error(line):
communicate_error(process, line)
process = Popen(command, stdout=PIPE, stderr=PIPE, bufsize=1)
Thread(target=read_stdout, args=[process]).start()
Thread(target=read_stderr, args=[process]).start()
您可以将代码放入自定义 class(对 do_something_with_stdout()
、detected_error()
、communicate_error()
方法进行分组)。