写入多个输出文件的程序的流式包装器
Streaming wrapper around program that writes to multiple output files
有一个程序(我无法修改)创建两个输出文件。我正在尝试编写一个 Python 包装器来调用该程序,同时读取两个输出流,组合输出并打印到标准输出(以促进流式传输)。我怎样才能在没有死锁的情况下做到这一点?下面的概念证明工作正常,但是当我将这种方法应用于实际程序时它会死锁。
概念证明:这是一个虚拟程序,bogus.py
,它创建了两个输出文件,就像我试图包装的程序一样。
#!/usr/bin/env python
from __future__ import print_function
import sys
with open(sys.argv[1], 'w') as f1, open(sys.argv[2], 'w') as f2:
for i in range(1000):
if i % 2 == 0:
print(i, file=f1)
else:
print(i, file=f2)
这里是 Python 包装器,它调用程序并组合它的两个输出(每次交错 4 行)。
#!/usr/bin/env python
from __future__ import print_function
from contextlib import contextmanager
import os
import shutil
import subprocess
import tempfile
@contextmanager
def named_pipe():
"""
Create a temporary named pipe.
Stolen shamelessly from Whosebug:
"""
dirname = tempfile.mkdtemp()
try:
path = os.path.join(dirname, 'named_pipe')
os.mkfifo(path)
yield path
finally:
shutil.rmtree(dirname)
with named_pipe() as f1, named_pipe() as f2:
cmd = ['./bogus.py', f1, f2]
child = subprocess.Popen(cmd)
with open(f1, 'r') as in1, open(f2, 'r') as in2:
buff = list()
for i, lines in enumerate(zip(in1, in2)):
line1 = lines[0].strip()
line2 = lines[1].strip()
print(line1)
buff.append(line2)
if len(buff) == 4:
for line in buff:
print(line)
Popen
只生成进程。您必须执行类似 child.communicate()
的操作才能真正与其交互并获取其输出。
此外,我认为您需要 open
在开始该过程之前读取管道。
I'm seeing big chunks of one file and then big chunks of the other file, regardless of whether I write to stdout, stderr, or tty.
如果您不能使 child 对文件使用 line-buffering 那么一个简单的解决方案 在进程仍在进行时从输出文件中读取完整的交错行 运行 一旦输出可用 就是使用线程:
#!/usr/bin/env python2
from subprocess import Popen
from threading import Thread
from Queue import Queue
def readlines(path, queue):
try:
with open(path) as pipe:
for line in iter(pipe.readline, ''):
queue.put(line)
finally:
queue.put(None)
with named_pipes(n=2) as paths:
child = Popen(['python', 'child.py'] + paths)
queue = Queue()
for path in paths:
Thread(target=readlines, args=[path, queue]).start()
for _ in paths:
for line in iter(queue.get, None):
print line.rstrip('\n')
其中 .
pipe.readline()
对于 Python 2 上的 non-blocking 管道已损坏,这就是此处使用线程的原因。
要打印一个文件中的一行,然后打印另一个文件中的一行:
with named_pipes(n=2) as paths:
child = Popen(['python', 'child.py'] + paths)
queues = [Queue() for _ in paths]
for path, queue in zip(paths, queues):
Thread(target=readlines, args=[path, queue]).start()
while queues:
for q in queues:
line = q.get()
if line is None: # EOF
queues.remove(q)
else:
print line.rstrip('\n')
如果 child.py
向一个文件写入的行多于另一个文件,那么差异将保留在内存中,因此 queues
中的各个队列可能会无限增长,直到它们填满所有内存。您可以设置队列中项目的最大数量,但是您必须将超时传递给 q.get()
否则代码可能会死锁。
如果您需要从一个输出文件中精确打印 4 行,然后从另一个输出文件中精确打印 4 行,等等,那么您可以稍微修改给定的代码示例:
while queues:
# print 4 lines from one queue followed by 4 lines from another queue
for q in queues:
for _ in range(4):
line = q.get()
if line is None: # EOF
queues.remove(q)
break
else:
print line.rstrip('\n')
它不会死锁,但如果您的 child 进程将太多数据写入一个文件而没有向另一个文件写入足够的数据,它可能会占用所有内存(只有差异保留在内存中 - 如果文件是相对相等;该程序支持任意大输出文件)。
有一个程序(我无法修改)创建两个输出文件。我正在尝试编写一个 Python 包装器来调用该程序,同时读取两个输出流,组合输出并打印到标准输出(以促进流式传输)。我怎样才能在没有死锁的情况下做到这一点?下面的概念证明工作正常,但是当我将这种方法应用于实际程序时它会死锁。
概念证明:这是一个虚拟程序,bogus.py
,它创建了两个输出文件,就像我试图包装的程序一样。
#!/usr/bin/env python
from __future__ import print_function
import sys
with open(sys.argv[1], 'w') as f1, open(sys.argv[2], 'w') as f2:
for i in range(1000):
if i % 2 == 0:
print(i, file=f1)
else:
print(i, file=f2)
这里是 Python 包装器,它调用程序并组合它的两个输出(每次交错 4 行)。
#!/usr/bin/env python
from __future__ import print_function
from contextlib import contextmanager
import os
import shutil
import subprocess
import tempfile
@contextmanager
def named_pipe():
"""
Create a temporary named pipe.
Stolen shamelessly from Whosebug:
"""
dirname = tempfile.mkdtemp()
try:
path = os.path.join(dirname, 'named_pipe')
os.mkfifo(path)
yield path
finally:
shutil.rmtree(dirname)
with named_pipe() as f1, named_pipe() as f2:
cmd = ['./bogus.py', f1, f2]
child = subprocess.Popen(cmd)
with open(f1, 'r') as in1, open(f2, 'r') as in2:
buff = list()
for i, lines in enumerate(zip(in1, in2)):
line1 = lines[0].strip()
line2 = lines[1].strip()
print(line1)
buff.append(line2)
if len(buff) == 4:
for line in buff:
print(line)
Popen
只生成进程。您必须执行类似 child.communicate()
的操作才能真正与其交互并获取其输出。
此外,我认为您需要 open
在开始该过程之前读取管道。
I'm seeing big chunks of one file and then big chunks of the other file, regardless of whether I write to stdout, stderr, or tty.
如果您不能使 child 对文件使用 line-buffering 那么一个简单的解决方案 在进程仍在进行时从输出文件中读取完整的交错行 运行 一旦输出可用 就是使用线程:
#!/usr/bin/env python2
from subprocess import Popen
from threading import Thread
from Queue import Queue
def readlines(path, queue):
try:
with open(path) as pipe:
for line in iter(pipe.readline, ''):
queue.put(line)
finally:
queue.put(None)
with named_pipes(n=2) as paths:
child = Popen(['python', 'child.py'] + paths)
queue = Queue()
for path in paths:
Thread(target=readlines, args=[path, queue]).start()
for _ in paths:
for line in iter(queue.get, None):
print line.rstrip('\n')
其中
pipe.readline()
对于 Python 2 上的 non-blocking 管道已损坏,这就是此处使用线程的原因。
要打印一个文件中的一行,然后打印另一个文件中的一行:
with named_pipes(n=2) as paths:
child = Popen(['python', 'child.py'] + paths)
queues = [Queue() for _ in paths]
for path, queue in zip(paths, queues):
Thread(target=readlines, args=[path, queue]).start()
while queues:
for q in queues:
line = q.get()
if line is None: # EOF
queues.remove(q)
else:
print line.rstrip('\n')
如果 child.py
向一个文件写入的行多于另一个文件,那么差异将保留在内存中,因此 queues
中的各个队列可能会无限增长,直到它们填满所有内存。您可以设置队列中项目的最大数量,但是您必须将超时传递给 q.get()
否则代码可能会死锁。
如果您需要从一个输出文件中精确打印 4 行,然后从另一个输出文件中精确打印 4 行,等等,那么您可以稍微修改给定的代码示例:
while queues:
# print 4 lines from one queue followed by 4 lines from another queue
for q in queues:
for _ in range(4):
line = q.get()
if line is None: # EOF
queues.remove(q)
break
else:
print line.rstrip('\n')
它不会死锁,但如果您的 child 进程将太多数据写入一个文件而没有向另一个文件写入足够的数据,它可能会占用所有内存(只有差异保留在内存中 - 如果文件是相对相等;该程序支持任意大输出文件)。