允许 python 子进程的多个输入

allowing multiple inputs to python subprocess

我有一个与几年前的问题几乎相同的问题:Python subprocess with two inputs 收到了一个答案但没有实施。我希望这篇转发可以帮助我和其他人理清头绪。

如上,我想使用子进程来包装一个需要多个输入的命令行工具。特别是,我想避免将输入文件写入磁盘,而是宁愿使用例如命名管道,如上文所述。那应该读作 "learn how to",因为我承认我以前从未尝试过使用命名管道。我将进一步说明我目前的输入是两个 pandas 数据帧,我想取回一个作为输出。

通用命令行实现:

/usr/local/bin/my_command inputfileA.csv inputfileB.csv -o outputfile

不出所料,我当前的实施不起作用。我没有看到 how/when 数据帧通过命名管道发送到命令进程,非常感谢您的帮助!

import os
import StringIO
import subprocess
import pandas as pd
dfA = pd.DataFrame([[1,2,3],[3,4,5]], columns=["A","B","C"])
dfB = pd.DataFrame([[5,6,7],[6,7,8]], columns=["A","B","C"]) 

# make two FIFOs to host the dataframes
fnA = 'inputA'; os.mkfifo(fnA); ffA = open(fnA,"w")
fnB = 'inputB'; os.mkfifo(fnB); ffB = open(fnB,"w")

# don't know if I need to make two subprocesses to pipe inputs 
ppA  = subprocess.Popen("echo", 
                    stdin =subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)
ppB  = subprocess.Popen("echo", 
                    stdin = suprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)

ppA.communicate(input = dfA.to_csv(header=False,index=False,sep="\t"))
ppB.communicate(input = dfB.to_csv(header=False,index=False,sep="\t"))


pope = subprocess.Popen(["/usr/local/bin/my_command",
                        fnA,fnB,"stdout"],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
(out,err) = pope.communicate()

try:
    out = pd.read_csv(StringIO.StringIO(out), header=None,sep="\t")
except ValueError: # fail
    out = ""
    print("\n###command failed###\n")

os.unlink(fnA); os.remove(fnA)
os.unlink(fnB); os.remove(fnB)

所以有几件事情可能会把你搞砸。前面 post 的重要内容是将这些 FIFO 视为普通文件。除了发生的正常情况是,如果您尝试从一个进程中的管道读取而不连接另一个进程以在另一端写入它(反之亦然),它们会阻塞。这就是我可能处理这种情况的方式,我会尽力描述我的想法。


首先,当你在主进程中时,你尝试调用 ffA = open(fnA, 'w') 你 运行 进入我上面谈到的问题 - 的另一端没有人管道从中读取数据,所以在发出命令后,主进程将被阻塞。考虑到这一点,您可能需要更改代码以删除 open() 调用:

# make two FIFOs to host the dataframes
fnA = './inputA';
os.mkfifo(fnA);
fnB = './inputB';
os.mkfifo(fnB);

好的,我们已经制作好管道 'inputA' 和 'inputB',并准备为 reading/writing 打开。为了防止像上面那样发生阻塞,我们需要启动几个子进程来调用open()。由于我对子进程库不是特别熟悉,我将只分叉几个子进程。

for x in xrange(2):

    pid = os.fork()
    if pid == 0:
            if x == 0:
                    dfA.to_csv(open(fnA, 'w'), header=False, index=False, sep='\t')
            else:
                    dfB.to_csv(open(fnB, 'w'), header=False, index=False, sep='\t')
            exit()
    else:
            continue

好的,现在我们让这两个子进程在等待写入各自的 FIFO 时阻塞。现在我们可以 运行 我们的命令连接到管道的另一端并开始读取。

pope = subprocess.Popen(["./my_cmd.sh",
                        fnA,fnB],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
(out,err) = pope.communicate()

try:
    out = pd.read_csv(StringIO.StringIO(out), header=None,sep="\t")
except ValueError: # fail
    out = ""
    print("\n###command failed###\n")

我发现的最后一个注释是取消链接管道似乎将其删除,因此无需调用 remove()

os.unlink(fnA); 
os.unlink(fnB);
print "out: ", out

在我的机器上打印语句产生:

out:     0  1  2
0  1  2  3
1  3  4  5
2  5  6  7
3  6  7  8

顺便说一句,我的命令只是一对 cat 语句:

#!/bin/bash

cat 
cat 

您不需要额外的进程来将数据传递给子进程而不将其写入磁盘:

#!/usr/bin/env python
import os
import shutil
import subprocess
import tempfile
import threading
from contextlib import contextmanager    
import pandas as pd

@contextmanager
def named_pipes(count):
    dirname = tempfile.mkdtemp()
    try:
        paths = []
        for i in range(count):
            paths.append(os.path.join(dirname, 'named_pipe' + str(i)))
            os.mkfifo(paths[-1])
        yield paths
    finally:
        shutil.rmtree(dirname)

def write_command_input(df, path):
    df.to_csv(path, header=False,index=False, sep="\t")

dfA = pd.DataFrame([[1,2,3],[3,4,5]], columns=["A","B","C"])
dfB = pd.DataFrame([[5,6,7],[6,7,8]], columns=["A","B","C"])

with named_pipes(2) as paths:
    p = subprocess.Popen(["cat"] + paths, stdout=subprocess.PIPE)
    with p.stdout:
        for df, path in zip([dfA, dfB], paths):
            t = threading.Thread(target=write_command_input, args=[df, path]) 
            t.daemon = True
            t.start()
        result = pd.read_csv(p.stdout, header=None, sep="\t")
p.wait()

cat用于演示。您应该改用您的命令 ("/usr/local/bin/my_command")。我假设您不能使用标准输入传递数据,而必须通过文件传递输入。结果从子进程的标准输出中读取。