写入多个输出文件的程序的流式包装器

Streaming wrapper around program that writes to multiple output files

有一个程序(我无法修改)创建两个输出文件。我正在尝试编写一个 Python 包装器来调用该程序,同时读取两个输出流,组合输出并打印到标准输出(以促进流式传输)。我怎样才能在没有死锁的情况下做到这一点?下面的概念证明工作正常,但是当我将这种方法应用于实际程序时它会死锁。


概念证明:这是一个虚拟程序,bogus.py,它创建了两个输出文件,就像我试图包装的程序一样。

#!/usr/bin/env python
from __future__ import print_function
import sys
with open(sys.argv[1], 'w') as f1, open(sys.argv[2], 'w') as f2:
    for i in range(1000):
        if i % 2 == 0:
            print(i, file=f1)
        else:
            print(i, file=f2)

这里是 Python 包装器,它调用程序并组合它的两个输出(每次交错 4 行)。

#!/usr/bin/env python
from __future__ import print_function
from contextlib import contextmanager
import os
import shutil
import subprocess
import tempfile

@contextmanager
def named_pipe():
    """
    Create a temporary named pipe.

    Stolen shamelessly from Whosebug:
    
    """
    dirname = tempfile.mkdtemp()
    try:
        path = os.path.join(dirname, 'named_pipe')
        os.mkfifo(path)
        yield path
    finally:
        shutil.rmtree(dirname)

with named_pipe() as f1, named_pipe() as f2:
    cmd = ['./bogus.py', f1, f2]
    child = subprocess.Popen(cmd)
    with open(f1, 'r') as in1, open(f2, 'r') as in2:
        buff = list()
        for i, lines in enumerate(zip(in1, in2)):
            line1 = lines[0].strip()
            line2 = lines[1].strip()
            print(line1)
            buff.append(line2)
            if len(buff) == 4:
                for line in buff:
                    print(line)

Popen 只生成进程。您必须执行类似 child.communicate() 的操作才能真正与其交互并获取其输出。

此外,我认为您需要 open 在开始该过程之前读取管道。

I'm seeing big chunks of one file and then big chunks of the other file, regardless of whether I write to stdout, stderr, or tty.

如果您不能使 child 对文件使用 line-buffering 那么一个简单的解决方案 在进程仍在进行时从输出文件中读取完整的交错行 运行 一旦输出可用 就是使用线程:

#!/usr/bin/env python2
from subprocess import Popen
from threading import Thread
from Queue import Queue

def readlines(path, queue):
    try:
        with open(path) as pipe:
            for line in iter(pipe.readline, ''):
                queue.put(line)
    finally:
        queue.put(None)

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queue = Queue()
    for path in paths:
        Thread(target=readlines, args=[path, queue]).start()
    for _ in paths:
        for line in iter(queue.get, None):
            print line.rstrip('\n')

其中 .

pipe.readline() 对于 Python 2 上的 non-blocking 管道已损坏,这就是此处使用线程的原因。


要打印一个文件中的一行,然后打印另一个文件中的一行:

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queues = [Queue() for _ in paths]
    for path, queue in zip(paths, queues):
        Thread(target=readlines, args=[path, queue]).start()
    while queues:
        for q in queues:
            line = q.get()
            if line is None:  # EOF
                queues.remove(q)
            else:
                print line.rstrip('\n')

如果 child.py 向一个文件写入的行多于另一个文件,那么差异将保留在内存中,因此 queues 中的各个队列可能会无限增长,直到它们填满所有内存。您可以设置队列中项目的最大数量,但是您必须将超时传递给 q.get() 否则代码可能会死锁。


如果您需要从一个输出文件中精确打印 4 行,然后从另一个输出文件中精确打印 4 行,等等,那么您可以稍微修改给定的代码示例:

    while queues:
        # print 4 lines from one queue followed by 4 lines from another queue
        for q in queues:
            for _ in range(4):
                line = q.get()
                if line is None:  # EOF
                    queues.remove(q)
                    break
                else:
                    print line.rstrip('\n')

它不会死锁,但如果您的 child 进程将太多数据写入一个文件而没有向另一个文件写入足够的数据,它可能会占用所有内存(只有差异保留在内存中 - 如果文件是相对相等;该程序支持任意大输出文件)。