缓冲过程输出导致截断?

buffering of process output causing truncation?

我有一个包含 100 行的示例文件,我正在使用带有 cat 的子进程读取它。但是,队列中的输出总是被截断。我怀疑这可能是由于 cat 缓冲了它的输出,因为它检测到一个管道。

p = subprocess.Popen("cat file.txt",
                     stdout=subprocess.PIPE,
                     stderr=subprocess.PIPE,
                     stdin=subprocess.PIPE,
                     shell=True,
                     bufsize=0)

我正在使用单独的线程从 cat 的 stdout 和 stderr 管道读取:

def StdOutThread():
  while not p.stdout.closed and running:
    line = ""
    while not line or line[-1] != "\n":
      r = p.stdout.read(1)
      if not r:
        break
      line += r
      pending_line["line"] = line

    if line and line[-1] == "\n":
      line = line[:-1]
    if line:
      queue.put(("out", line))

这些线程启动并将它们读取的内容转储到队列中。当 cat 还活着时,主线程从这个队列中读取。

with CancelFunction(p.kill):
    try:
      stdout_thread = threading.Thread(target=StdOutThread)
      stdout_thread.start()
      while p.poll() is None:
        ReadFromQueue()
      while not queue.empty():
        ReadFromQueue()  
    finally:
      running = False
      stdout_thread.join()

我考虑过使用 pexpect 来解决这个问题,但同时也想区分 stdout 和 stderr,这似乎无法通过 pexpect 实现。将不胜感激。

我确定在 cat 的所有输出都被读取并放入队列之前,您的主线程正在退出 try 块。

请注意 cat 即使您没有阅读其所有输出也可以退出。 考虑以下事件序列:

  1. cat 写出最后一行
  2. cat 退出
  3. 在 reader 线程更改为从 cat 读取输出的最后一位之前,主线程检测到 cat 已退出(通过 p.poll()
  4. 然后主线程退出try块并设置running为false
  5. reader 个线程退出,因为 running 为假,但在此之前 最后输入已被读取。

下面是一个更简单的方法,它使用队列中的 sentinel 值 通知主线程 reader 线程已退出。

如果 cat 退出那么它最终会在它所在的管道上到达 EOF 监控。当发生这种情况时,它会将 None 放入队列中 通知主线程它完成了。当两个 reader 线程都有 完成主线程可以安全地停止监视队列和 加入话题。

import threading
import subprocess
import os
import time
import Queue
import sys

def pipe_thread(queue, name, handle):
  print "in handlehandle"
  for line in handle:
    if line[-1] == "\n":
      line = line[:-1]
    queue.put( (name, line) )
  queue.put(None)

def main():
    p = subprocess.Popen("cat file.txt",
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         stdin=subprocess.PIPE,
                         shell=True,
                         bufsize=0)

    queue = Queue.Queue()

    t1 = threading.Thread(target = pipe_thread,
                             args = [queue, "stdout", p.stdout])
    t2 = threading.Thread(target = pipe_thread,
                             args = [queue, "stderr", p.stderr])

    t1.start()
    t2.start()

    alive = 2
    count = 0
    while alive > 0:
      item = queue.get()
      if item == None:
        alive = alive - 1
      else:
        (which, line) = item
        count += 1
        print count, "got from", which, ":", line
    print "joining..."
    t1.join()
    t2.join()

main()