将大量数据写入标准输入
writing large amount of data to stdin
我正在向标准输入写入大量数据。
如何确保它不阻塞?
p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
p.stdin.write('A very very very large amount of data')
p.stdin.flush()
output = p.stdout.readline()
它似乎在我读取一个大字符串并写入后挂在 p.stdin.write()
。
我有大量文件将按顺序写入标准输入(>1k 个文件)
所以发生的事情是我 运行 一个循环
#this loop is repeated for all the files
for stri in lines:
p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
p.stdin.write(stri)
output = p.stdout.readline()
#do some processing
它不知何故挂在文件号。 400. 该文件是一个长字符串的大文件。
我怀疑是阻塞问题。
这只会在我从 0 迭代到 1000 时发生。但是,如果我从文件 400 开始,则不会发生错误
您可能必须使用 Popen.communicate()
。
如果您向 stdin 写入大量数据,并且在此期间子进程向 stdout 生成输出,那么在处理所有 stdin 数据之前子进程的 stdout 缓冲区已满可能会成为一个问题。子进程在写入 stdout 时阻塞(因为您没有读取它)并且您在写入 stdin 时被阻塞。
Popen.communicate()
可以用来同时写入stdin和读取stdout/stderr来避免前面的问题。
注意:Popen.communicate()
只适用于输入和输出数据适合您的内存(它们不会太大)。
更新:
如果您决定使用线程,这里有一个示例父进程和子进程实现,您可以根据自己的需要进行定制:
parent.py:
#!/usr/bin/env python2
import os
import sys
import subprocess
import threading
import Queue
class MyStreamingSubprocess(object):
def __init__(self, *argv):
self.process = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.stdin_queue = Queue.Queue()
self.stdout_queue = Queue.Queue()
self.stdin_thread = threading.Thread(target=self._stdin_writer_thread)
self.stdout_thread = threading.Thread(target=self._stdout_reader_thread)
self.stdin_thread.start()
self.stdout_thread.start()
def process_item(self, item):
self.stdin_queue.put(item)
return self.stdout_queue.get()
def terminate(self):
self.stdin_queue.put(None)
self.process.terminate()
self.stdin_thread.join()
self.stdout_thread.join()
return self.process.wait()
def _stdin_writer_thread(self):
while 1:
item = self.stdin_queue.get()
if item is None:
# signaling the child process that the end of the
# input has been reached: some console progs handle
# the case when reading from stdin returns empty string
self.process.stdin.close()
break
try:
self.process.stdin.write(item)
except IOError:
# making sure that the current self.process_item()
# call doesn't deadlock
self.stdout_queue.put(None)
break
def _stdout_reader_thread(self):
while 1:
try:
output = self.process.stdout.readline()
except IOError:
output = None
self.stdout_queue.put(output)
# output is empty string if the process has
# finished or None if an IOError occurred
if not output:
break
if __name__ == '__main__':
child_script_path = os.path.join(os.path.dirname(__file__), 'child.py')
process = MyStreamingSubprocess(sys.executable, '-u', child_script_path)
try:
while 1:
item = raw_input('Enter an item to process (leave empty and press ENTER to exit): ')
if not item:
break
result = process.process_item(item + '\n')
if result:
print('Result: ' + result)
else:
print('Error processing item! Exiting.')
break
finally:
print('Terminating child process...')
process.terminate()
print('Finished.')
child.py:
#!/usr/bin/env python2
import sys
while 1:
item = sys.stdin.readline()
sys.stdout.write('Processed: ' + item)
注意:IOError
在reader/writer线程上处理,以处理子进程exits/crashes/killed.
的情况
要以可移植的方式避免死锁,请在单独的线程中写入 child:
#!/usr/bin/env python
from subprocess import Popen, PIPE
from threading import Thread
def pump_input(pipe, lines):
with pipe:
for line in lines:
pipe.write(line)
p = Popen(path, stdin=PIPE, stdout=PIPE, bufsize=1)
Thread(target=pump_input, args=[p.stdin, lines]).start()
with p.stdout:
for line in iter(p.stdout.readline, b''): # read output
print line,
p.wait()
见Python: read streaming input from subprocess.communicate()
我正在向标准输入写入大量数据。
如何确保它不阻塞?
p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
p.stdin.write('A very very very large amount of data')
p.stdin.flush()
output = p.stdout.readline()
它似乎在我读取一个大字符串并写入后挂在 p.stdin.write()
。
我有大量文件将按顺序写入标准输入(>1k 个文件)
所以发生的事情是我 运行 一个循环
#this loop is repeated for all the files
for stri in lines:
p=subprocess.Popen([path],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
p.stdin.write(stri)
output = p.stdout.readline()
#do some processing
它不知何故挂在文件号。 400. 该文件是一个长字符串的大文件。
我怀疑是阻塞问题。
这只会在我从 0 迭代到 1000 时发生。但是,如果我从文件 400 开始,则不会发生错误
您可能必须使用 Popen.communicate()
。
如果您向 stdin 写入大量数据,并且在此期间子进程向 stdout 生成输出,那么在处理所有 stdin 数据之前子进程的 stdout 缓冲区已满可能会成为一个问题。子进程在写入 stdout 时阻塞(因为您没有读取它)并且您在写入 stdin 时被阻塞。
Popen.communicate()
可以用来同时写入stdin和读取stdout/stderr来避免前面的问题。
注意:Popen.communicate()
只适用于输入和输出数据适合您的内存(它们不会太大)。
更新: 如果您决定使用线程,这里有一个示例父进程和子进程实现,您可以根据自己的需要进行定制:
parent.py:
#!/usr/bin/env python2
import os
import sys
import subprocess
import threading
import Queue
class MyStreamingSubprocess(object):
def __init__(self, *argv):
self.process = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.stdin_queue = Queue.Queue()
self.stdout_queue = Queue.Queue()
self.stdin_thread = threading.Thread(target=self._stdin_writer_thread)
self.stdout_thread = threading.Thread(target=self._stdout_reader_thread)
self.stdin_thread.start()
self.stdout_thread.start()
def process_item(self, item):
self.stdin_queue.put(item)
return self.stdout_queue.get()
def terminate(self):
self.stdin_queue.put(None)
self.process.terminate()
self.stdin_thread.join()
self.stdout_thread.join()
return self.process.wait()
def _stdin_writer_thread(self):
while 1:
item = self.stdin_queue.get()
if item is None:
# signaling the child process that the end of the
# input has been reached: some console progs handle
# the case when reading from stdin returns empty string
self.process.stdin.close()
break
try:
self.process.stdin.write(item)
except IOError:
# making sure that the current self.process_item()
# call doesn't deadlock
self.stdout_queue.put(None)
break
def _stdout_reader_thread(self):
while 1:
try:
output = self.process.stdout.readline()
except IOError:
output = None
self.stdout_queue.put(output)
# output is empty string if the process has
# finished or None if an IOError occurred
if not output:
break
if __name__ == '__main__':
child_script_path = os.path.join(os.path.dirname(__file__), 'child.py')
process = MyStreamingSubprocess(sys.executable, '-u', child_script_path)
try:
while 1:
item = raw_input('Enter an item to process (leave empty and press ENTER to exit): ')
if not item:
break
result = process.process_item(item + '\n')
if result:
print('Result: ' + result)
else:
print('Error processing item! Exiting.')
break
finally:
print('Terminating child process...')
process.terminate()
print('Finished.')
child.py:
#!/usr/bin/env python2
import sys
while 1:
item = sys.stdin.readline()
sys.stdout.write('Processed: ' + item)
注意:IOError
在reader/writer线程上处理,以处理子进程exits/crashes/killed.
要以可移植的方式避免死锁,请在单独的线程中写入 child:
#!/usr/bin/env python
from subprocess import Popen, PIPE
from threading import Thread
def pump_input(pipe, lines):
with pipe:
for line in lines:
pipe.write(line)
p = Popen(path, stdin=PIPE, stdout=PIPE, bufsize=1)
Thread(target=pump_input, args=[p.stdin, lines]).start()
with p.stdout:
for line in iter(p.stdout.readline, b''): # read output
print line,
p.wait()
见Python: read streaming input from subprocess.communicate()