硬杀死一个 python 子线程
Hard kill a python sub-thread
我有一个 python 脚本,它通过 shell 命令与 C++ 程序通信。
Python 脚本调用 C++ 程序并通过管道获取响应。
C++ 程序缓冲输出并阻止从管道读取的线程。我使用 class:
解决了这些问题
import os
import subprocess
import threading
import queue
import time
import pty
class DaemonCall():
def __init__(self):
self.popen = None
self.stdoutQueue = None
self.stderrQueue = None
self.stdoutThread = None
self.stderrThread = None
def __del__(self):
pass
def call(self, command):
masterStdout, slaveStdout = pty.openpty()
masterStderr, slaveStderr = pty.openpty()
self.popen = subprocess.Popen(command, shell=True, stdout=slaveStdout, stderr=slaveStderr, bufsize=0)
self.stdoutQueue, self.stdoutThread = self.getAsyncReadQueue(masterStdout)
self.stderrQueue, self.stderrThread = self.getAsyncReadQueue(masterStderr)
@classmethod
def getAsyncReadQueue(cls, source):
newQueue = queue.Queue()
newThread = threading.Thread(target=cls.enqueueOutput, args=(os.fdopen(source), newQueue))
newThread.daemon = True # thread dies with the program
newThread.start()
return newQueue, newThread
@staticmethod
def enqueueOutput(pipe, outputQueue):
for newLine in iter(pipe.readline, b''):
outputQueue.put(newLine)
pipe.close()
callWrapper = DaemonCall()
callWrapper.call('some shell command')
time.sleep(1)
try:
line = callWrapper.stdoutQueue.get_nowait() # or q.get(timeout=.1)
except queue.Empty:
print('no output yet')
else:
print(line)
现在我有另一个问题 - 每次调用都会创建两个线程来从管道中读取数据,这两个线程被 C++ 程序阻塞并一直存在到脚本结束。我需要一种方法来终止此类进程。
最重要的是 - 将一些代码粘贴到 __del__
方法
中
有什么想法可以杀死在从管道读取数据时阻塞的线程吗?
这一切都适用于 Ubuntu 14.04,python 3.4
只需终止子进程:self.popen.kill(); self.popen.wait()
。线程将自动退出(当进程结束时,打开的管道等资源被释放——pipe.readline()
应该 return 一个空结果(意味着 EOF))。尽管对于 pty.openpty()
它可能会失败——在这种情况下手动关闭 pty fds。
您已经在使用 pty
(不可移植行为)因此您不需要线程(以获得可移植行为):您可以使用 pexpect
模块(围绕pty
) 或 fcntl
(非阻塞读取),或 select
(一次等待多个 fd 超时),或 asyncio
代替。查看代码示例:
- Python: read streaming input from subprocess.communicate() -- 一个简单的代码示例,如果子进程足够频繁地刷新其标准输出缓冲区,它就可以工作。注意:在 Python 3 上不需要
for line in iter(pipe.readline, b'')
(预读错误已修复)即,您可以使用 for line in pipe
代替
- Python subprocess readlines() hangs -- 显示如何强制行缓冲行为
- Python C program subprocess hangs at “for line in iter” -- 关于块缓冲问题的更多信息 -- 如果可以修改子进程的其他解决方案
- Non-blocking read on a subprocess.PIPE in python -- 查看所有解决方案(包括不可移植的 -- 你不需要 Windows 支持)
- Subprocess.Popen: cloning stdout and stderr both to terminal and variables -- 显示如何使用
asyncio
同时读取两个 stdout/stderr
我创建了一个 class 来通过管道与其他进程通信。
Class 创建单独的线程,这些线程 read/write 通过管道传输并使用异步队列与您的线程进行通信。
它是我在项目中使用的经过验证的解决方案
import time
import subprocess
import queue
import threading
TIMEOUT_POLLINGINTERVAL = 0.5
class ShellCall():
def __init__(self):
self._popen = None
""" :type: subprocess.Popen """
self._stdOutQueue = None
""" :type: queue.Queue """
self._stdErrQueue = None
""" :type: queue.Queue """
self._stdOut = []
self._stdErr = []
def __del__(self):
if self._popen and self._popen.poll() is None:
self._popen.kill()
def call(self, command, shell=False):
"""
Execute a shell command
:param command: command to be executed
:type command: str | list[str]
:param shell: If shell is True, the specified command will be executed through the shell
:type shell: bool
:rtype: None
"""
if shell:
command = command.encode('utf-8')
else:
command = [item.encode('utf-8') for item in command]
self._popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, bufsize=0)
self._stdOutQueue = self._getAsyncReadQueue(self._popen.stdout)
self._stdErrQueue = self._getAsyncReadQueue(self._popen.stderr)
def _getAsyncReadQueue(self, sourcePipe):
"""
Create a thread to read from pipe in asynchronous mode, get queue to receive data from pipe
:param sourcePipe: Pipe to read from
:type sourcePipe: pipe
:return: Queue to receive read data
:rtype: queue.Queue
"""
newQueue = queue.Queue()
newThread = threading.Thread(target=self._enqueueOutput, args=(sourcePipe, newQueue))
newThread.daemon = True # thread dies with the program
newThread.start()
return newQueue
@staticmethod
def _enqueueOutput(sourcePipe, outputQueue):
"""
Read from pipe and write to the queue
:param sourcePipe: Pipe to read from
:type sourcePipe: pipe
:param outputQueue: Queue to write to
:type outputQueue: queue.Queue
"""
for line in iter(sourcePipe.readline, b''):
outputQueue.put(line)
def waitNotNoneReturnCode(self, timeout, *, checkCallback=None):
"""
Wait until any return code
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, checkCallback=checkCallback)
def waitNoErrorReturnCode(self, timeout, *, checkCallback=None):
"""
Wait until success return code '0'. Otherwise raise ShellException
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True, checkCallback=checkCallback)
def waitNoStdErr(self, timeout, *, checkCallback=None):
"""
Wait until success return code '0' and empty stderr. Otherwise raise ShellException
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True, noStdErr=True, checkCallback=checkCallback)
def waitStdOut(self, timeout, *, checkCallback=None):
"""
Wait until success return code '0', empty stderr and not empty stdout. Otherwise raise ShellException
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True,
noStdErr=True, stdOut=True, checkCallback=checkCallback)
def _wait(self, timeout, *, pollingTime=TIMEOUT_POLLINGINTERVAL,
notNoneReturnCode=False, noErrorReturnCode=False, noStdErr=False, stdOut=False, checkCallback=None):
"""
Raise ShellException if conditions not satisfied (see :func:`checkCallResults`).
Raise ShellException if conditions not satisfied too long.
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param pollingTime: Time interval length to check result of command execution
:type pollingTime: float
:rtype: None
"""
startTime = time.time()
while True:
if self._checkCallResults(notNoneReturnCode=notNoneReturnCode, noErrorReturnCode=noErrorReturnCode,
noStdErr=noStdErr, stdOut=stdOut, checkCallback=checkCallback):
return
# exception due to timeout
if time.time() - startTime > timeout:
raise ShellException('Shell call not finished too long', self)
time.sleep(pollingTime)
def _checkCallResults(self, notNoneReturnCode=False, noErrorReturnCode=False,
noStdErr=False, stdOut=False, checkCallback=None):
"""
Raise ShellException if noErrorReturnCode=True and shell call return not 0 return call
Raise ShellException if noStdErr=True and shell call print anything to stderr
:param notNoneReturnCode: return True only if shell call return any return call
:type notNoneReturnCode: bool
:param noErrorReturnCode: return True only if shell call return 0 return code
:type noErrorReturnCode: bool
:param noStdErr: return True only if shell call print nothing to stderr
:type noStdErr: bool
:param stdOut: return True only if shell call print anything to stdout
:type stdOut: bool
:param checkCallback: Any callable that will be used to check is shell call finished,
positional arguments, keyword arguments
:type checkCallback: callable, args, kwargs
:return: True if conditions are satisfied
:rtype: bool
"""
# exceptions
if noErrorReturnCode:
if self.getReturnCode() is not None and self.getReturnCode() > 0:
raise ShellException('Shell call finished with error return code', self)
if noStdErr:
if len(self.getStdErr()) > 0:
raise ShellException('Shell call have non-empty stderr', self)
# break loop
notNoneReturnCodeCondition = (self.getReturnCode() is not None) if notNoneReturnCode else True
noErrorReturnCodeCondition = (self.getReturnCode() == 0) if noErrorReturnCode else True
notStdErrCondition = (len(self.getStdErr()) == 0) if noStdErr else True
stdOutCondition = (len(self.getStdOut()) > 0) if stdOut else True
callbackCondition = checkCallback() if checkCallback else True
if notNoneReturnCodeCondition and noErrorReturnCodeCondition and \
notStdErrCondition and stdOutCondition and callbackCondition:
return True
else:
return False
def getReturnCode(self):
"""
Get return code of the process
:return: return code of the child process or None if process is not terminated yet
:rtype: int|None
"""
return self._popen.poll()
def getStdOut(self):
"""
Get list with stdout lines
:rtype: list[str]
"""
self._stdOut += self._readAllQueue(self._stdOutQueue)
return self._stdOut
def getStdErr(self):
"""
Get list with stderr lines
:rtype: list[str]
"""
self._stdErr += self._readAllQueue(self._stdErrQueue)
return self._stdErr
@staticmethod
def _readAllQueue(sourceQueue):
lines = []
try:
while True:
line = sourceQueue.get_nowait() # or q.get(timeout=.1)
line = line.decode('utf-8').rstrip()
lines.append(line)
except queue.Empty:
return lines
def __repr__(self):
stdOut = str.join(' ', self.getStdOut())
stdOut = (stdOut[:1000] + '...') if len(stdOut) > 1000 else stdOut
stdErr = str.join(' ', self.getStdErr())
stdErr = (stdErr[:1000] + '...') if len(stdErr) > 1000 else stdErr
return '<ShellCall(command={}, ReturnCode={}, stdout="{}", stderr="{}")>'. \
format(self._popen.args, self.getReturnCode(), stdOut, stdErr)
class ShellException(Exception):
def __init__(self, description, shellCall):
"""
:param description: test description of the error
:type description: str
:param shellCall: shell call object used to execute a command
:type shellCall: ShellCall
:rtype: None
"""
super(Exception, self).__init__(description, shellCall)
def getShellCall(self):
"""
Get shell call object used to execute a command
:rtype: ShellCall
"""
description, shellCall = self.args
return shellCall
我有一个 python 脚本,它通过 shell 命令与 C++ 程序通信。 Python 脚本调用 C++ 程序并通过管道获取响应。
C++ 程序缓冲输出并阻止从管道读取的线程。我使用 class:
解决了这些问题import os
import subprocess
import threading
import queue
import time
import pty
class DaemonCall():
def __init__(self):
self.popen = None
self.stdoutQueue = None
self.stderrQueue = None
self.stdoutThread = None
self.stderrThread = None
def __del__(self):
pass
def call(self, command):
masterStdout, slaveStdout = pty.openpty()
masterStderr, slaveStderr = pty.openpty()
self.popen = subprocess.Popen(command, shell=True, stdout=slaveStdout, stderr=slaveStderr, bufsize=0)
self.stdoutQueue, self.stdoutThread = self.getAsyncReadQueue(masterStdout)
self.stderrQueue, self.stderrThread = self.getAsyncReadQueue(masterStderr)
@classmethod
def getAsyncReadQueue(cls, source):
newQueue = queue.Queue()
newThread = threading.Thread(target=cls.enqueueOutput, args=(os.fdopen(source), newQueue))
newThread.daemon = True # thread dies with the program
newThread.start()
return newQueue, newThread
@staticmethod
def enqueueOutput(pipe, outputQueue):
for newLine in iter(pipe.readline, b''):
outputQueue.put(newLine)
pipe.close()
callWrapper = DaemonCall()
callWrapper.call('some shell command')
time.sleep(1)
try:
line = callWrapper.stdoutQueue.get_nowait() # or q.get(timeout=.1)
except queue.Empty:
print('no output yet')
else:
print(line)
现在我有另一个问题 - 每次调用都会创建两个线程来从管道中读取数据,这两个线程被 C++ 程序阻塞并一直存在到脚本结束。我需要一种方法来终止此类进程。
最重要的是 - 将一些代码粘贴到 __del__
方法
有什么想法可以杀死在从管道读取数据时阻塞的线程吗?
这一切都适用于 Ubuntu 14.04,python 3.4
只需终止子进程:self.popen.kill(); self.popen.wait()
。线程将自动退出(当进程结束时,打开的管道等资源被释放——pipe.readline()
应该 return 一个空结果(意味着 EOF))。尽管对于 pty.openpty()
它可能会失败——在这种情况下手动关闭 pty fds。
您已经在使用 pty
(不可移植行为)因此您不需要线程(以获得可移植行为):您可以使用 pexpect
模块(围绕pty
) 或 fcntl
(非阻塞读取),或 select
(一次等待多个 fd 超时),或 asyncio
代替。查看代码示例:
- Python: read streaming input from subprocess.communicate() -- 一个简单的代码示例,如果子进程足够频繁地刷新其标准输出缓冲区,它就可以工作。注意:在 Python 3 上不需要
for line in iter(pipe.readline, b'')
(预读错误已修复)即,您可以使用for line in pipe
代替 - Python subprocess readlines() hangs -- 显示如何强制行缓冲行为
- Python C program subprocess hangs at “for line in iter” -- 关于块缓冲问题的更多信息 -- 如果可以修改子进程的其他解决方案
- Non-blocking read on a subprocess.PIPE in python -- 查看所有解决方案(包括不可移植的 -- 你不需要 Windows 支持)
- Subprocess.Popen: cloning stdout and stderr both to terminal and variables -- 显示如何使用
asyncio
同时读取两个 stdout/stderr
我创建了一个 class 来通过管道与其他进程通信。 Class 创建单独的线程,这些线程 read/write 通过管道传输并使用异步队列与您的线程进行通信。 它是我在项目中使用的经过验证的解决方案
import time
import subprocess
import queue
import threading
TIMEOUT_POLLINGINTERVAL = 0.5
class ShellCall():
def __init__(self):
self._popen = None
""" :type: subprocess.Popen """
self._stdOutQueue = None
""" :type: queue.Queue """
self._stdErrQueue = None
""" :type: queue.Queue """
self._stdOut = []
self._stdErr = []
def __del__(self):
if self._popen and self._popen.poll() is None:
self._popen.kill()
def call(self, command, shell=False):
"""
Execute a shell command
:param command: command to be executed
:type command: str | list[str]
:param shell: If shell is True, the specified command will be executed through the shell
:type shell: bool
:rtype: None
"""
if shell:
command = command.encode('utf-8')
else:
command = [item.encode('utf-8') for item in command]
self._popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, bufsize=0)
self._stdOutQueue = self._getAsyncReadQueue(self._popen.stdout)
self._stdErrQueue = self._getAsyncReadQueue(self._popen.stderr)
def _getAsyncReadQueue(self, sourcePipe):
"""
Create a thread to read from pipe in asynchronous mode, get queue to receive data from pipe
:param sourcePipe: Pipe to read from
:type sourcePipe: pipe
:return: Queue to receive read data
:rtype: queue.Queue
"""
newQueue = queue.Queue()
newThread = threading.Thread(target=self._enqueueOutput, args=(sourcePipe, newQueue))
newThread.daemon = True # thread dies with the program
newThread.start()
return newQueue
@staticmethod
def _enqueueOutput(sourcePipe, outputQueue):
"""
Read from pipe and write to the queue
:param sourcePipe: Pipe to read from
:type sourcePipe: pipe
:param outputQueue: Queue to write to
:type outputQueue: queue.Queue
"""
for line in iter(sourcePipe.readline, b''):
outputQueue.put(line)
def waitNotNoneReturnCode(self, timeout, *, checkCallback=None):
"""
Wait until any return code
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, checkCallback=checkCallback)
def waitNoErrorReturnCode(self, timeout, *, checkCallback=None):
"""
Wait until success return code '0'. Otherwise raise ShellException
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True, checkCallback=checkCallback)
def waitNoStdErr(self, timeout, *, checkCallback=None):
"""
Wait until success return code '0' and empty stderr. Otherwise raise ShellException
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True, noStdErr=True, checkCallback=checkCallback)
def waitStdOut(self, timeout, *, checkCallback=None):
"""
Wait until success return code '0', empty stderr and not empty stdout. Otherwise raise ShellException
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param checkCallback: Any callable that will be used to check is shell call finished
:type checkCallback: callable
:rtype: None
"""
self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True,
noStdErr=True, stdOut=True, checkCallback=checkCallback)
def _wait(self, timeout, *, pollingTime=TIMEOUT_POLLINGINTERVAL,
notNoneReturnCode=False, noErrorReturnCode=False, noStdErr=False, stdOut=False, checkCallback=None):
"""
Raise ShellException if conditions not satisfied (see :func:`checkCallResults`).
Raise ShellException if conditions not satisfied too long.
:param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
:type timeout: float
:param pollingTime: Time interval length to check result of command execution
:type pollingTime: float
:rtype: None
"""
startTime = time.time()
while True:
if self._checkCallResults(notNoneReturnCode=notNoneReturnCode, noErrorReturnCode=noErrorReturnCode,
noStdErr=noStdErr, stdOut=stdOut, checkCallback=checkCallback):
return
# exception due to timeout
if time.time() - startTime > timeout:
raise ShellException('Shell call not finished too long', self)
time.sleep(pollingTime)
def _checkCallResults(self, notNoneReturnCode=False, noErrorReturnCode=False,
noStdErr=False, stdOut=False, checkCallback=None):
"""
Raise ShellException if noErrorReturnCode=True and shell call return not 0 return call
Raise ShellException if noStdErr=True and shell call print anything to stderr
:param notNoneReturnCode: return True only if shell call return any return call
:type notNoneReturnCode: bool
:param noErrorReturnCode: return True only if shell call return 0 return code
:type noErrorReturnCode: bool
:param noStdErr: return True only if shell call print nothing to stderr
:type noStdErr: bool
:param stdOut: return True only if shell call print anything to stdout
:type stdOut: bool
:param checkCallback: Any callable that will be used to check is shell call finished,
positional arguments, keyword arguments
:type checkCallback: callable, args, kwargs
:return: True if conditions are satisfied
:rtype: bool
"""
# exceptions
if noErrorReturnCode:
if self.getReturnCode() is not None and self.getReturnCode() > 0:
raise ShellException('Shell call finished with error return code', self)
if noStdErr:
if len(self.getStdErr()) > 0:
raise ShellException('Shell call have non-empty stderr', self)
# break loop
notNoneReturnCodeCondition = (self.getReturnCode() is not None) if notNoneReturnCode else True
noErrorReturnCodeCondition = (self.getReturnCode() == 0) if noErrorReturnCode else True
notStdErrCondition = (len(self.getStdErr()) == 0) if noStdErr else True
stdOutCondition = (len(self.getStdOut()) > 0) if stdOut else True
callbackCondition = checkCallback() if checkCallback else True
if notNoneReturnCodeCondition and noErrorReturnCodeCondition and \
notStdErrCondition and stdOutCondition and callbackCondition:
return True
else:
return False
def getReturnCode(self):
"""
Get return code of the process
:return: return code of the child process or None if process is not terminated yet
:rtype: int|None
"""
return self._popen.poll()
def getStdOut(self):
"""
Get list with stdout lines
:rtype: list[str]
"""
self._stdOut += self._readAllQueue(self._stdOutQueue)
return self._stdOut
def getStdErr(self):
"""
Get list with stderr lines
:rtype: list[str]
"""
self._stdErr += self._readAllQueue(self._stdErrQueue)
return self._stdErr
@staticmethod
def _readAllQueue(sourceQueue):
lines = []
try:
while True:
line = sourceQueue.get_nowait() # or q.get(timeout=.1)
line = line.decode('utf-8').rstrip()
lines.append(line)
except queue.Empty:
return lines
def __repr__(self):
stdOut = str.join(' ', self.getStdOut())
stdOut = (stdOut[:1000] + '...') if len(stdOut) > 1000 else stdOut
stdErr = str.join(' ', self.getStdErr())
stdErr = (stdErr[:1000] + '...') if len(stdErr) > 1000 else stdErr
return '<ShellCall(command={}, ReturnCode={}, stdout="{}", stderr="{}")>'. \
format(self._popen.args, self.getReturnCode(), stdOut, stdErr)
class ShellException(Exception):
def __init__(self, description, shellCall):
"""
:param description: test description of the error
:type description: str
:param shellCall: shell call object used to execute a command
:type shellCall: ShellCall
:rtype: None
"""
super(Exception, self).__init__(description, shellCall)
def getShellCall(self):
"""
Get shell call object used to execute a command
:rtype: ShellCall
"""
description, shellCall = self.args
return shellCall