如何停止从进程中累积标准输出?

How to stop accumulation of stdout from process?

所以我正在使用 await asyncio.create_subprocess_shell(args,stdout=asyncio.subprocess.PIPE,limit=self.chunk_size),并从 await 的结果中获取标准输出。返回的是asyncio的StreamReader,其中我使用read(self.chunk_size)协程将每个块放入队列中,由一些线程处理。

它能够 运行 很好,但是我正在处理大量的标准输出,我发现内存使用量增加缓慢。

令我感到困惑的是,当我限制写入标准输出的内容的缓冲区大小时,为什么内存使用量会继续增加。我以为设置限制会限制正在读取的数据,并保持不变。

感谢您的任何见解!

编辑: 使用 tshark 的最小 运行ning 示例。您可以从 here

获取示例 pcap

我实际上看到运行这个例子是用yes命令和iostat -t 1命令,我没有运行进入这个问题。但是我用这个 tshark 命令做了。 运行宁 python 3.6

import asyncio,sys
from multiprocessing import Process, Queue
from threading import Thread
from shlex import quote

q = Queue()
chunk_size = 8192 
async def yes_command():
        args = ' '.join(["tshark", "-r", quote(sys.argv[1]),"-T", "fields", "-e", "eth.src", "-e", "eth.dst", "-e", "ip.src", "-e", "ip.dst", "-e", "ip.proto", "-e", "tcp.srcport", "-e", "tcp.dstport", "-e", "udp.srcport", "-e", "udp.dstport", "-E", "separator=@"])
        process = await asyncio.create_subprocess_shell(args,stdout=asyncio.subprocess.PIPE,limit=chunk_size)
        #this is a Stream Reader
        return process.stdout


async def test():
    streamer = await yes_command()

    registered = set()
    task = asyncio.ensure_future(streamer.read(chunk_size))
    registered.add(task)
    while registered:
        done,pending = await asyncio.wait(
            registered, timeout=None, return_when=asyncio.FIRST_COMPLETED
        )
        if not done:
            break
        for f in done:
            registered.discard(f)
            res = f.result()
            q.put(res)
            if res != b'':
                task = asyncio.ensure_future(streamer.read(chunk_size))
                registered.add(task)
            else:
                q.put(None)


    return 'done' 

### A very simple worker
def worker(q):
    while True:
        res = q.get()
        ### do something with res
        if res is None:
            print("none")
            break

    
if __name__ == "__main__":
    task_list = []

    consumer = Thread(target=worker,args=(q,))
    consumer.daemon=True
    consumer.start()

    loop = asyncio.get_event_loop()
    ### alter range value to spawn x tshark processes.
    for i in range(5):
        task_list.append(test())
    
    commands = asyncio.gather(*task_list)
    result = loop.run_until_complete(commands)
    consumer.join()
    loop.close()

我将把它留在这里,以防有人遇到同样的问题。

read 行更改为 readexactly() 解决了这个问题。问题是 read 只读取了一个非常小的上限,这将很多信息附加到我的队列中,而这些信息并没有足够快地出队。刚刚测试了大输出的修复,得到了我需要的东西。

虽然您的回答解决了眼前的问题,但仍然存在潜在的问题,即您的队列具有无限容量,使您容易受到无限内存积累的影响。如果没有 backpressure(暂停生产者的能力),只要队列消费者始终比生产者慢,就会出现内存问题,而且很难保证这种情况永远不会发生,尤其是在生产中.

我建议切换到有界队列以获得背压。为此,您不能使用 queue.Queue,因为它会阻塞,您需要 asyncio.Queue。您需要:

  • 使用 asyncio.Queue(10) 创建队列 - 您可以使用任何正数作为容量。
  • 使用 await q.put(res) 入队数据 - q.put 暂停直到队列中有空闲槽,提供背压)
  • 使用 res = asyncio.run_coroutine_threadsafe(loop, q.get()).result() 检索数据 - 你需要 run_coroutine_threadsafe 因为事件循环 运行 在不同的线程中。

此外,将子进程输出传输到队列的循环似乎不必要地复杂。 (也许这是对更复杂的实际代码进行简化的残余。)我建议用一个简单的循环来做:

async def test():
    streamer = await yes_command()
    while True:
        data = await streamer.read(chunk_size)
        if data == b'':
            await q.put(None)
            break
        await q.put(data)

如果您需要并行 运行 多个这样的生产者,您可以只启动多个协程 运行 上述循环 - 您不需要在一个显式集合中兼顾读者。