子进程中的多个管道

Multiple pipes in subprocess

我正在尝试在 ruffus 管道中使用 Sailfish,它将多个 fastq 文件作为参数。我使用 python 中的子进程模块执行 Sailfish,但是即使我设置了 shell=True.

,子进程调用中的 <() 也不起作用

这是我要使用python执行的命令:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]

或(最好):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]

概括:

someprogram <(someprocess) <(someprocess)

我将如何在 python 中执行此操作?子流程是正确的方法吗?

模拟bash process substitution

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
           shell=True, executable='/bin/bash')

在 Python 中,您可以使用命名管道:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
    someprogram = Popen(['someprogram'] + paths)
    processes = []
    for path, command in zip(paths, ['someprocess', 'anotherprocess']):
        with open(path, 'wb', 0) as pipe:
            processes.append(Popen(command, stdout=pipe, close_fds=True))
    for p in [someprogram] + processes:
        p.wait()

其中 named_pipes(n) 是:

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
    dirname = tempfile.mkdtemp()
    try:
        paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
        for path in paths:
            os.mkfifo(path)
        yield paths
    finally:
        shutil.rmtree(dirname)

实现 bash 进程替换的另一种更好的方法(无需在磁盘上创建命名条目)是使用 /dev/fd/N 文件名(如果可用)作为 . On FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process.要测试可用性,运行:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

如果失败;尝试将 /dev/fd 符号链接到 proc(5),就像在某些 Linux 上所做的那样:

$ ln -s /proc/self/fd /dev/fd

这是 /dev/fd 基于 someprogram <(someprocess) <(anotherprocess) bash 命令的实现:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
    if process.poll() is None: # still running
        process.kill()

with ExitStack() as stack: # for proper cleanup
    processes = []
    for command in [['someprocess'], ['anotherprocess']]:  # start child processes
        processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
        stack.callback(kill, processes[-1]) # kill on someprogram exit

    fds = [p.stdout.fileno() for p in processes]
    someprogram = stack.enter_context(
        Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
    for p in processes: # close pipes in the parent
        p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
   raise CalledProcessError(someprogram.returncode, someprogram.args)

注意:在我的 Ubuntu 机器上,subprocess 代码仅适用于 Python 3.4+,尽管 pass_fds 从 Python 3.2 开始可用。

同时 J.F。 Sebastian 使用命名管道提供了一个答案,可以使用匿名管道来做到这一点。

import shlex
from subprocess import Popen, PIPE

inputcmd0 = "zcat hello.gz" # gzipped file containing "hello"
inputcmd1 = "zcat world.gz" # gzipped file containing "world"

def get_filename(file_):
    return "/dev/fd/{}".format(file_.fileno())

def get_stdout_fds(*processes):
    return tuple(p.stdout.fileno() for p in processes)

# setup producer processes
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE)
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE)

# setup consumer process
# pass input processes pipes by "filename" eg. /dev/fd/5
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), 
    file1=get_filename(inputproc1.stdout))
print("command is:", cmd)
# pass_fds argument tells Popen to let the child process inherit the pipe's fds
someprogram = Popen(shlex.split(cmd), stdout=PIPE, 
    pass_fds=get_stdout_fds(inputproc0, inputproc1))

output, error = someprogram.communicate()

for p in [inputproc0, inputproc1, someprogram]:
    p.wait()

assert output == b"hello\nworld\n"