Polling/awaiting HUP Popen.stdin
Polling/awaiting HUP on Popen.stdin
我正在 Python 中写一个 Zenity-based status monitor for GNU dd(1)
。
由于我所针对的系统的限制,包装器必须 运行 on Python 2,并且不能引入外部库。
其中一项要求是 Zenity 的“取消”按钮在 dd 尚未完成时终止。
我必须立即执行以下操作(即triggered/driven/immediately);如果以下多个条件met/triggered 同时,它们将按列出的顺序执行:
- 当 Zenity 退出时,终止 dd
- 当 dd 写入其
stderr
时,munge+将该数据转发到 Zenity 的 stdin
- 当 dd 退出时,如果其 return 代码不为 0,则终止 Zenity
但是,epoll 对象似乎只在 dd 的输出上触发;它永远不会在 Zenity 退出时触发,尽管我在 Zenity 的 stdin
.
上注册了 EPOLLHUP
如何should/can做到这一点?我知道 epoll 是唯一可用于正确触发 dd
输出的原语(通过 EPOLLIN
);我也明白这是一个笨拙的原语,可能不适合在 Zenity 退出时触发。 (如果需要,我可以在这个文件中实现更多逻辑;这样做比引入任何第 3 方库要好得多,无论它多么小或“常见”。我重申,我知道 epoll 很难使用,可能需要一个大量的胶合逻辑。)
或者:如果 epoll 不是监视 subprocess
退出的正确原语,那么在监视子进程输出 in 时监视子进程退出的正确方法是什么a Python 2兼容方式?
(我本身并不需要多线程功能;按顺序执行所有操作将完全符合规范;但是,如果多线程编程在这种情况下绝对必要避免忙循环,那就这样吧。)
下面是我目前的完整代码。
#!/usr/bin/env python
from __future__ import division
import sys,os,stat,fcntl,select,subprocess,re
def main(args=sys.argv[1:]):
fname = parseifname(args)
n = sizeof(fname)
dcmd = ['dd'] + args + ['status=progress']
zcmd = ['zenity', '--progress', '--time-remaining']
#Launch dd
dd = subprocess.Popen(dcmd, stderr=subprocess.PIPE)
set_nonblocking(dd.stderr)
#Launch Zenity
zenity = subprocess.Popen(zcmd, stdin=subprocess.PIPE)
set_direct(zenity.stdin)#TODO: why doesn't this line work?*
#set title/status
zenity.stdin.write(('#%s\n' % ' '.join(dcmd)).encode())
zenity.stdin.flush()#*i.e. instances of this line shouldn't be necessary...
#We want to trigger on all of the following:
toPoll = [
(dd.stderr, select.EPOLLIN #dd status update
| select.EPOLLHUP), #dd exit
(zenity.stdin, select.EPOLLHUP), #Zenity exit
]
calcPercent = genCalcPercent(n)
with ePoll(toPoll) as E:
rBytes = re.compile(r'\r(\d+) bytes'.encode())
while dd.poll() is None:
evs = E.poll()#TODO: I'm not sure if this is blocking, or if I've induced a busy loop...
for fn,ev in evs:
if fn == dd.stderr.fileno():
if (ev & select.EPOLLIN):
#dd sent some output
line = dd.stderr.read()
m = rBytes.match(line)
#sys.stderr.buffer.write(line)
if m:
x = int(m.groups()[0])
zenity.stdin.write(('%f\n' % calcPercent(x)).encode())
zenity.stdin.flush()
if (ev & select.EPOLLHUP):
#dd exited
pass#The containing loop will handle this; don't need to take action
if fn == zenity.stdin.fileno():
if (ev & select.EPOLLHUP):#TODO: WHY DOESN'T THIS ACTIVATE??
#Zenity exited
dd.terminate()
if dd.returncode == 0:
#dd exited successfully
zenity.stdin.write('100\n'.encode())
zenity.stdin.flush()
else:
zenity.terminate()
# Functions below here #
def parseifname(argv=sys.argv[:1], default='/dev/stdin'):
'''Given dd's argument list, attempts to return the name of that file which dd would use as its input file'''
M = re.compile(r'^if=(.*)$')
ifname = default
for x in argv:
m = M.match(x)
if m:
ifname = m.groups()[0]
return ifname
def sizeof(fname):
'''Attempts to find the length, in bytes, of the given file or block device'''
s = os.stat(fname)
m = s.st_mode
try:
if stat.S_ISREG(m):
#Regular File
n = s.st_size
elif stat.S_ISBLK(m):
#Block Device
n = int(subprocess.check_output(['lsblk', '-b', '-n', '-l', '-o', 'SIZE', '-d', fname]))
else:
raise ValueError("file is neither a standard nor block file")
except:
#Unidentifiable
n = None
return n
def genCalcPercent(n):
'''Given n, returns a function which, given x, returns either x as a percentage of n, or some sane stand-in for such'''
if n:
#Input file size was identified
return lambda x: 100 * x / n
else:
#Input file size was unidentifiable, zero, or otherwise falsy
#we'll at least try to visually show progress
return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))
def set_nonblocking(fd=sys.stdin):
'''Appends os.O_NONBLOCK to the given file descriptor's flags.'''
return fcntl.fcntl(
fd,
fcntl.F_SETFL,
fcntl.fcntl(fd,fcntl.F_GETFL)
| os.O_NONBLOCK
)
def set_direct(fd=sys.stdout):
'''Appends os.O_SYNC to the given file descriptor's flags.'''
return fcntl.fcntl(
fd,
fcntl.F_SETFL,
fcntl.fcntl(fd,fcntl.F_GETFL)
| os.O_SYNC
)
class ePoll:
'''Thin contextlib wrapper around select.epoll; allows tersely watching multiple events'''
def __init__(self, fdSpecs):
self._E = select.epoll()
self._fds = []
for fd,opt in fdSpecs:
self._E.register(fd,opt)
self._fds.append(fd)
def __enter__(self):
return self._E
def __exit__(self, exc_type, exc_value, traceback):
for fd in self._fds:
self._E.unregister(fd)
self._E.close()
if __name__=='__main__':
main()
事实证明答案很简单:使用EPOLLERR
而不是EPOLLHUP
。
我严重怀疑这是正确的解决方案*,但它确实有效:
import select, subprocess, time
E = select.epoll()
p = subprocess.Popen(["sh", "-c", "sleep 3"], stdin=subprocess.PIPE)
#time.sleep(5) #Uncomment this line to convince yourself there is no race-condition here
E.register(p.stdin, select.EPOLLERR)
print("Polling...")
evs = E.poll()
print("Caught events!")
assert (p.stdin.fileno(), select.EPOLLERR) in evs
E.close()
*如果这不是正确的解决方案,那么即使是现在,我也非常想发现正确的解决方案是什么。
(如果有人关心,这里是原始问题的脚本 the completed version。)
我正在 Python 中写一个 Zenity-based status monitor for GNU dd(1)
。
由于我所针对的系统的限制,包装器必须 运行 on Python 2,并且不能引入外部库。
其中一项要求是 Zenity 的“取消”按钮在 dd 尚未完成时终止。
我必须立即执行以下操作(即triggered/driven/immediately);如果以下多个条件met/triggered 同时,它们将按列出的顺序执行:
- 当 Zenity 退出时,终止 dd
- 当 dd 写入其
stderr
时,munge+将该数据转发到 Zenity 的stdin
- 当 dd 退出时,如果其 return 代码不为 0,则终止 Zenity
但是,epoll 对象似乎只在 dd 的输出上触发;它永远不会在 Zenity 退出时触发,尽管我在 Zenity 的 stdin
.
EPOLLHUP
如何should/can做到这一点?我知道 epoll 是唯一可用于正确触发 dd
输出的原语(通过 EPOLLIN
);我也明白这是一个笨拙的原语,可能不适合在 Zenity 退出时触发。 (如果需要,我可以在这个文件中实现更多逻辑;这样做比引入任何第 3 方库要好得多,无论它多么小或“常见”。我重申,我知道 epoll 很难使用,可能需要一个大量的胶合逻辑。)
或者:如果 epoll 不是监视 subprocess
退出的正确原语,那么在监视子进程输出 in 时监视子进程退出的正确方法是什么a Python 2兼容方式?
(我本身并不需要多线程功能;按顺序执行所有操作将完全符合规范;但是,如果多线程编程在这种情况下绝对必要避免忙循环,那就这样吧。)
下面是我目前的完整代码。
#!/usr/bin/env python
from __future__ import division
import sys,os,stat,fcntl,select,subprocess,re
def main(args=sys.argv[1:]):
fname = parseifname(args)
n = sizeof(fname)
dcmd = ['dd'] + args + ['status=progress']
zcmd = ['zenity', '--progress', '--time-remaining']
#Launch dd
dd = subprocess.Popen(dcmd, stderr=subprocess.PIPE)
set_nonblocking(dd.stderr)
#Launch Zenity
zenity = subprocess.Popen(zcmd, stdin=subprocess.PIPE)
set_direct(zenity.stdin)#TODO: why doesn't this line work?*
#set title/status
zenity.stdin.write(('#%s\n' % ' '.join(dcmd)).encode())
zenity.stdin.flush()#*i.e. instances of this line shouldn't be necessary...
#We want to trigger on all of the following:
toPoll = [
(dd.stderr, select.EPOLLIN #dd status update
| select.EPOLLHUP), #dd exit
(zenity.stdin, select.EPOLLHUP), #Zenity exit
]
calcPercent = genCalcPercent(n)
with ePoll(toPoll) as E:
rBytes = re.compile(r'\r(\d+) bytes'.encode())
while dd.poll() is None:
evs = E.poll()#TODO: I'm not sure if this is blocking, or if I've induced a busy loop...
for fn,ev in evs:
if fn == dd.stderr.fileno():
if (ev & select.EPOLLIN):
#dd sent some output
line = dd.stderr.read()
m = rBytes.match(line)
#sys.stderr.buffer.write(line)
if m:
x = int(m.groups()[0])
zenity.stdin.write(('%f\n' % calcPercent(x)).encode())
zenity.stdin.flush()
if (ev & select.EPOLLHUP):
#dd exited
pass#The containing loop will handle this; don't need to take action
if fn == zenity.stdin.fileno():
if (ev & select.EPOLLHUP):#TODO: WHY DOESN'T THIS ACTIVATE??
#Zenity exited
dd.terminate()
if dd.returncode == 0:
#dd exited successfully
zenity.stdin.write('100\n'.encode())
zenity.stdin.flush()
else:
zenity.terminate()
# Functions below here #
def parseifname(argv=sys.argv[:1], default='/dev/stdin'):
'''Given dd's argument list, attempts to return the name of that file which dd would use as its input file'''
M = re.compile(r'^if=(.*)$')
ifname = default
for x in argv:
m = M.match(x)
if m:
ifname = m.groups()[0]
return ifname
def sizeof(fname):
'''Attempts to find the length, in bytes, of the given file or block device'''
s = os.stat(fname)
m = s.st_mode
try:
if stat.S_ISREG(m):
#Regular File
n = s.st_size
elif stat.S_ISBLK(m):
#Block Device
n = int(subprocess.check_output(['lsblk', '-b', '-n', '-l', '-o', 'SIZE', '-d', fname]))
else:
raise ValueError("file is neither a standard nor block file")
except:
#Unidentifiable
n = None
return n
def genCalcPercent(n):
'''Given n, returns a function which, given x, returns either x as a percentage of n, or some sane stand-in for such'''
if n:
#Input file size was identified
return lambda x: 100 * x / n
else:
#Input file size was unidentifiable, zero, or otherwise falsy
#we'll at least try to visually show progress
return lambda x: 99.99999 * (1 - 0.5 ** (x / 2**32))
def set_nonblocking(fd=sys.stdin):
'''Appends os.O_NONBLOCK to the given file descriptor's flags.'''
return fcntl.fcntl(
fd,
fcntl.F_SETFL,
fcntl.fcntl(fd,fcntl.F_GETFL)
| os.O_NONBLOCK
)
def set_direct(fd=sys.stdout):
'''Appends os.O_SYNC to the given file descriptor's flags.'''
return fcntl.fcntl(
fd,
fcntl.F_SETFL,
fcntl.fcntl(fd,fcntl.F_GETFL)
| os.O_SYNC
)
class ePoll:
'''Thin contextlib wrapper around select.epoll; allows tersely watching multiple events'''
def __init__(self, fdSpecs):
self._E = select.epoll()
self._fds = []
for fd,opt in fdSpecs:
self._E.register(fd,opt)
self._fds.append(fd)
def __enter__(self):
return self._E
def __exit__(self, exc_type, exc_value, traceback):
for fd in self._fds:
self._E.unregister(fd)
self._E.close()
if __name__=='__main__':
main()
事实证明答案很简单:使用EPOLLERR
而不是EPOLLHUP
。
我严重怀疑这是正确的解决方案*,但它确实有效:
import select, subprocess, time
E = select.epoll()
p = subprocess.Popen(["sh", "-c", "sleep 3"], stdin=subprocess.PIPE)
#time.sleep(5) #Uncomment this line to convince yourself there is no race-condition here
E.register(p.stdin, select.EPOLLERR)
print("Polling...")
evs = E.poll()
print("Caught events!")
assert (p.stdin.fileno(), select.EPOLLERR) in evs
E.close()
*如果这不是正确的解决方案,那么即使是现在,我也非常想发现正确的解决方案是什么。
(如果有人关心,这里是原始问题的脚本 the completed version。)