实时从管道获取基于行的输出

Get line-based output from pipe in real-time

我想(近)实时逐行读取 tcpdump 子进程的输出,但我需要评估管道是否为空(因此队列)的选项。线程等待 0.5 秒,获取所有排队的输出行,对其进行处理(例如,在 0.5 秒内平均分配数据包)和 returns 东西。

最小的非工作示例:

millis = lambda: int(round(time.time() * 1000))
def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        print(millis())
        print(line)
        queue.put(line)
    out.close()

def infiniteloop1():
    p = Popen( [ 'sudo', 'tcpdump', '-i', 'wlan0', '-nn', '-s0', '-q', '-l', '-p', '-S' ], stdout=subprocess.PIPE, stderr=STDOUT)
    q = Queue()
    t = Thread(target=enqueue_output, args=(p.stdout, q))
    t.daemon = True # thread dies with the program
    t.start()

    while True:
        while True:
            # read line without blocking
            try: 
                row = q.get_nowait() # or q.get(timeout=.1)
            except Empty:
                print('empty')
                break
            else:
                pass
        time.sleep(0.5)
thread1 = threading.Thread(target=infiniteloop1)
thread1.daemon = True
thread1.start()

捕获连续包流时的输出:

[...]
1552905183422
10:33:03.334167 IP 192.168.1.2.36189 > a.b.c.d.443: tcp 437
1552905183422
10:33:03.357215 IP a.b.c.d.443 > 192.168.1.2.36189: tcp 0
1552905183423
10:33:03.385145 IP 192.168.1.2.36189 > a.b.c.d.443: tcp 437
empty
empty
1552905184438
10:33:03.408408 IP a.b.c.d.443 > 192.168.1.2.36189: tcp 0
1552905184439
10:33:03.428045 IP 192.168.1.2.36189 > a.b.c.d.443: tcp 437
1552905184439
10:33:03.451235 IP a.b.c.d.443 > 192.168.1.2.36189: tcp 0
[...]

注意连续的两个"empty"。第一个 "empty" 之前的最后一个数据包被 tcpdump 捕获 10:33:03.385145 并传送到 1552905183423 的队列,耗时 38 毫秒。在这两个 "emptys" 之间,没有数据包被传送到队列中。第二个 "empty" 之后的第一个包在 10:33:03.408408 被捕获并交付 1552905184438,它比前一个数据包晚 1 秒交付但在 "emptys" 之间被捕获。为什么不在"emptys"之间传递?这种情况并不少见,但是每秒弹出队列都会导致没有包裹被送达,这是为什么?

The first package after the second "empty" was captured at 10:33:03.408408 and delivered 1552905184438, it was delivered 1 second after the previous packet but captured between the "emptys".

根据您的代码,系统时间戳仅在 for line in iter(out.readline, b'') returns 中的迭代器为新项目时​​才会计算和打印,因此这似乎是延迟的来源。

我怀疑 stdio 缓冲是罪魁祸首。在 Linux(即 libc/glibc)上,如果 STDOUT 描述符引用 TTY,则启用行缓冲。如果它指的是其他东西(例如管道),则 STDOUT 描述符是完全缓冲的;在调用 write 系统调用之前,您的进程需要填充 4096 字节(默认为 Linux)。
非常粗略地计算,根据您在此处显示的输出,您的子进程似乎每 ~0.025 秒生成 ~65 个字节。给定一个 4kB 缓冲区,填充它并触发 flush/write.

大约需要 1.625 秒

subprocess.PIPE 读取并将输出发送到主进程的标准输出所需的时间要少得多,因此您会看到 tcpdump 输出的突发,即打印间隔约 25 毫秒(接收来自 stdout 迭代器)在几微秒内,您的程序随后等待直到下一个 4kB 被刷新。

如果您有可能安装第 3 方软件包(并使用 Python >= 2.7),您可能需要查看 pexpect。该包的孩子连接到 PTY,使系统将它们视为交互式程序,因此它们的标准输出描述符是 line-buffered.