使用子进程重定向输入 python
Redirect input with subprocess python
如何将输入重定向 <()
与 python 的 subprocess.Popen
一起使用?
例如,假设我有:
import subprocess
class Test():
def __init__(self):
self.proc = subprocess.Popen(["sort file1.txt file2.txt)"],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def __iter__(self):
return self
def __next__(self):
while True:
line = self.proc.stdout.readline()
if not line:
raise StopIteration
return line.strip().decode('utf-8')
t = Test()
for line in t:
print(line)
上面的工作非常好,但我真的需要命令来做这样的事情:
sort <(python file1.txt) <(python file2.txt)
虽然这似乎没有 运行 任何东西,但即使这样也行不通
sort <(cat file1.txt) <(cat file2.txt)
我怎样才能让它与 python 的子进程一起工作,并逐行遍历结果
您应该告诉 subprocess.Popen() 使用支持 <(..)
语法的 /bin/bash
,而不是默认的 /bin/sh
,后者不支持:
def __init__(self):
self.proc = subprocess.Popen(["sort <(cat file1.txt) <(cat file2.txt)"],
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def __iter__(self):
return self
def __next__(self):
while True:
line = self.proc.stdout.readline()
if not line:
raise StopIteration
return line.strip().decode('utf-8')
t = Test()
for line in t:
print(line)
如何将输入重定向 <()
与 python 的 subprocess.Popen
一起使用?
例如,假设我有:
import subprocess
class Test():
def __init__(self):
self.proc = subprocess.Popen(["sort file1.txt file2.txt)"],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def __iter__(self):
return self
def __next__(self):
while True:
line = self.proc.stdout.readline()
if not line:
raise StopIteration
return line.strip().decode('utf-8')
t = Test()
for line in t:
print(line)
上面的工作非常好,但我真的需要命令来做这样的事情:
sort <(python file1.txt) <(python file2.txt)
虽然这似乎没有 运行 任何东西,但即使这样也行不通
sort <(cat file1.txt) <(cat file2.txt)
我怎样才能让它与 python 的子进程一起工作,并逐行遍历结果
您应该告诉 subprocess.Popen() 使用支持 <(..)
语法的 /bin/bash
,而不是默认的 /bin/sh
,后者不支持:
def __init__(self):
self.proc = subprocess.Popen(["sort <(cat file1.txt) <(cat file2.txt)"],
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def __iter__(self):
return self
def __next__(self):
while True:
line = self.proc.stdout.readline()
if not line:
raise StopIteration
return line.strip().decode('utf-8')
t = Test()
for line in t:
print(line)