如何停止从进程中累积标准输出?
How to stop accumulation of stdout from process?
所以我正在使用
await asyncio.create_subprocess_shell(args,stdout=asyncio.subprocess.PIPE,limit=self.chunk_size)
,并从 await 的结果中获取标准输出。返回的是asyncio的StreamReader,其中我使用read(self.chunk_size)
协程将每个块放入队列中,由一些线程处理。
它能够 运行 很好,但是我正在处理大量的标准输出,我发现内存使用量增加缓慢。
令我感到困惑的是,当我限制写入标准输出的内容的缓冲区大小时,为什么内存使用量会继续增加。我以为设置限制会限制正在读取的数据,并保持不变。
感谢您的任何见解!
编辑:
使用 tshark 的最小 运行ning 示例。您可以从 here
获取示例 pcap
我实际上看到运行这个例子是用yes命令和iostat -t 1命令,我没有运行进入这个问题。但是我用这个 tshark 命令做了。
运行宁 python 3.6
import asyncio,sys
from multiprocessing import Process, Queue
from threading import Thread
from shlex import quote
q = Queue()
chunk_size = 8192
async def yes_command():
args = ' '.join(["tshark", "-r", quote(sys.argv[1]),"-T", "fields", "-e", "eth.src", "-e", "eth.dst", "-e", "ip.src", "-e", "ip.dst", "-e", "ip.proto", "-e", "tcp.srcport", "-e", "tcp.dstport", "-e", "udp.srcport", "-e", "udp.dstport", "-E", "separator=@"])
process = await asyncio.create_subprocess_shell(args,stdout=asyncio.subprocess.PIPE,limit=chunk_size)
#this is a Stream Reader
return process.stdout
async def test():
streamer = await yes_command()
registered = set()
task = asyncio.ensure_future(streamer.read(chunk_size))
registered.add(task)
while registered:
done,pending = await asyncio.wait(
registered, timeout=None, return_when=asyncio.FIRST_COMPLETED
)
if not done:
break
for f in done:
registered.discard(f)
res = f.result()
q.put(res)
if res != b'':
task = asyncio.ensure_future(streamer.read(chunk_size))
registered.add(task)
else:
q.put(None)
return 'done'
### A very simple worker
def worker(q):
while True:
res = q.get()
### do something with res
if res is None:
print("none")
break
if __name__ == "__main__":
task_list = []
consumer = Thread(target=worker,args=(q,))
consumer.daemon=True
consumer.start()
loop = asyncio.get_event_loop()
### alter range value to spawn x tshark processes.
for i in range(5):
task_list.append(test())
commands = asyncio.gather(*task_list)
result = loop.run_until_complete(commands)
consumer.join()
loop.close()
我将把它留在这里,以防有人遇到同样的问题。
将 read
行更改为 readexactly()
解决了这个问题。问题是 read
只读取了一个非常小的上限,这将很多信息附加到我的队列中,而这些信息并没有足够快地出队。刚刚测试了大输出的修复,得到了我需要的东西。
虽然您的回答解决了眼前的问题,但仍然存在潜在的问题,即您的队列具有无限容量,使您容易受到无限内存积累的影响。如果没有 backpressure(暂停生产者的能力),只要队列消费者始终比生产者慢,就会出现内存问题,而且很难保证这种情况永远不会发生,尤其是在生产中.
我建议切换到有界队列以获得背压。为此,您不能使用 queue.Queue
,因为它会阻塞,您需要 asyncio.Queue
。您需要:
- 使用
asyncio.Queue(10)
创建队列 - 您可以使用任何正数作为容量。
- 使用
await q.put(res)
入队数据 - q.put
暂停直到队列中有空闲槽,提供背压)
- 使用
res = asyncio.run_coroutine_threadsafe(loop, q.get()).result()
检索数据 - 你需要 run_coroutine_threadsafe
因为事件循环 运行 在不同的线程中。
此外,将子进程输出传输到队列的循环似乎不必要地复杂。 (也许这是对更复杂的实际代码进行简化的残余。)我建议用一个简单的循环来做:
async def test():
streamer = await yes_command()
while True:
data = await streamer.read(chunk_size)
if data == b'':
await q.put(None)
break
await q.put(data)
如果您需要并行 运行 多个这样的生产者,您可以只启动多个协程 运行 上述循环 - 您不需要在一个显式集合中兼顾读者。
所以我正在使用
await asyncio.create_subprocess_shell(args,stdout=asyncio.subprocess.PIPE,limit=self.chunk_size)
,并从 await 的结果中获取标准输出。返回的是asyncio的StreamReader,其中我使用read(self.chunk_size)
协程将每个块放入队列中,由一些线程处理。
它能够 运行 很好,但是我正在处理大量的标准输出,我发现内存使用量增加缓慢。
令我感到困惑的是,当我限制写入标准输出的内容的缓冲区大小时,为什么内存使用量会继续增加。我以为设置限制会限制正在读取的数据,并保持不变。
感谢您的任何见解!
编辑: 使用 tshark 的最小 运行ning 示例。您可以从 here
获取示例 pcap我实际上看到运行这个例子是用yes命令和iostat -t 1命令,我没有运行进入这个问题。但是我用这个 tshark 命令做了。 运行宁 python 3.6
import asyncio,sys
from multiprocessing import Process, Queue
from threading import Thread
from shlex import quote
q = Queue()
chunk_size = 8192
async def yes_command():
args = ' '.join(["tshark", "-r", quote(sys.argv[1]),"-T", "fields", "-e", "eth.src", "-e", "eth.dst", "-e", "ip.src", "-e", "ip.dst", "-e", "ip.proto", "-e", "tcp.srcport", "-e", "tcp.dstport", "-e", "udp.srcport", "-e", "udp.dstport", "-E", "separator=@"])
process = await asyncio.create_subprocess_shell(args,stdout=asyncio.subprocess.PIPE,limit=chunk_size)
#this is a Stream Reader
return process.stdout
async def test():
streamer = await yes_command()
registered = set()
task = asyncio.ensure_future(streamer.read(chunk_size))
registered.add(task)
while registered:
done,pending = await asyncio.wait(
registered, timeout=None, return_when=asyncio.FIRST_COMPLETED
)
if not done:
break
for f in done:
registered.discard(f)
res = f.result()
q.put(res)
if res != b'':
task = asyncio.ensure_future(streamer.read(chunk_size))
registered.add(task)
else:
q.put(None)
return 'done'
### A very simple worker
def worker(q):
while True:
res = q.get()
### do something with res
if res is None:
print("none")
break
if __name__ == "__main__":
task_list = []
consumer = Thread(target=worker,args=(q,))
consumer.daemon=True
consumer.start()
loop = asyncio.get_event_loop()
### alter range value to spawn x tshark processes.
for i in range(5):
task_list.append(test())
commands = asyncio.gather(*task_list)
result = loop.run_until_complete(commands)
consumer.join()
loop.close()
我将把它留在这里,以防有人遇到同样的问题。
将 read
行更改为 readexactly()
解决了这个问题。问题是 read
只读取了一个非常小的上限,这将很多信息附加到我的队列中,而这些信息并没有足够快地出队。刚刚测试了大输出的修复,得到了我需要的东西。
虽然您的回答解决了眼前的问题,但仍然存在潜在的问题,即您的队列具有无限容量,使您容易受到无限内存积累的影响。如果没有 backpressure(暂停生产者的能力),只要队列消费者始终比生产者慢,就会出现内存问题,而且很难保证这种情况永远不会发生,尤其是在生产中.
我建议切换到有界队列以获得背压。为此,您不能使用 queue.Queue
,因为它会阻塞,您需要 asyncio.Queue
。您需要:
- 使用
asyncio.Queue(10)
创建队列 - 您可以使用任何正数作为容量。 - 使用
await q.put(res)
入队数据 -q.put
暂停直到队列中有空闲槽,提供背压) - 使用
res = asyncio.run_coroutine_threadsafe(loop, q.get()).result()
检索数据 - 你需要run_coroutine_threadsafe
因为事件循环 运行 在不同的线程中。
此外,将子进程输出传输到队列的循环似乎不必要地复杂。 (也许这是对更复杂的实际代码进行简化的残余。)我建议用一个简单的循环来做:
async def test():
streamer = await yes_command()
while True:
data = await streamer.read(chunk_size)
if data == b'':
await q.put(None)
break
await q.put(data)
如果您需要并行 运行 多个这样的生产者,您可以只启动多个协程 运行 上述循环 - 您不需要在一个显式集合中兼顾读者。