从 subprocess.Popen 流式传输并使用两个子命令时出现死锁

Deadlock When Streaming From subprocess.Popen And Using Two Subcommands

在我的 wsgi 应用程序中,我必须调用命令行工具来生成数据,有时还必须调用另一个命令行工具来转换该数据。这一切都通过 stdout/stdin 起作用。我正在使用子进程,我以前使用过通信,它工作正常但速度较慢,因为它不是流式传输并且试图将其转换为从 stdout 增量流式传输是导致我出现问题的原因。

旧代码(有效但不流式传输):

generator_process = subprocess.Popen(generator_command, stdout=subprocess.PIPE)
if convert_command:
    convert_process = subprocess.Popen(convert_command, stdout=subprocess.PIPE, stdin=subprocess.PIPE)

output, err = generator_process.communicate()

if convert_command:
    output, err = convert_process.communicate(output)

yield output

当前代码 - 在不需要进行转换时对我来说工作正常。但是卡在 subprocess.stdout.read() 否则(使用 readline 不会以任何方式产生影响):

generator_process = subprocess.Popen(generator_command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, bufsize=1)
convert_process = None
if convert_command:
    convert_process = subprocess.Popen(convert_command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.PIPE, bufsize=1)

while True:
    chunk = generator_process.stdout.read(chunk_size)
    if convert_process:
        if chunk:
            convert_process.stdin.write(chunk)
        chunk = convert_process.stdout.read(chunk_size)
        if chunk: yield chunk
        else: break

    else:
        if chunk: yield chunk
        else: break

请注意,由于这是一个 wsgi 应用程序,所以 asyncio/coroutines 将无法工作(这是我在进入 asyncio 兔子洞后意识到的)。

python 有什么方法可以从子进程流式传输到客户端而不会出现死锁吗?

如果 none 个子进程尝试从标准输入读取,那么我在您的代码中看到的死锁的唯一原因是 .write(chunk).read(chunk_size) 可能会退出sync if convert_process does not return byte for byte(如果 .flush().write(chunk) 之后没有帮助)。

要在 Python 中模拟 generator | convert 命令:

#!/usr/bin/env python3
from functools import partial
from subprocess import Popen, PIPE, DEVNULL

def get_chunks(generator_command, convert_command, chunk_size=1024):
    with Popen(generator_command, stdin=DEVNULL,
               stdout=PIPE, stderr=DEVNULL) as generator_process, \
         Popen(convert_command, stdin=generator_process.stdout, 
               stdout=PIPE, stderr=DEVNULL) as convert_process:
        yield from iter(partial(convert_process.stdout.read, chunk_size), b'')
    return generator_process.returncode, convert_process.returncode