从 subprocess.Popen 流式传输并使用两个子命令时出现死锁
Deadlock When Streaming From subprocess.Popen And Using Two Subcommands
在我的 wsgi 应用程序中,我必须调用命令行工具来生成数据,有时还必须调用另一个命令行工具来转换该数据。这一切都通过 stdout/stdin 起作用。我正在使用子进程,我以前使用过通信,它工作正常但速度较慢,因为它不是流式传输并且试图将其转换为从 stdout 增量流式传输是导致我出现问题的原因。
旧代码(有效但不流式传输):
generator_process = subprocess.Popen(generator_command, stdout=subprocess.PIPE)
if convert_command:
convert_process = subprocess.Popen(convert_command, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
output, err = generator_process.communicate()
if convert_command:
output, err = convert_process.communicate(output)
yield output
当前代码 - 在不需要进行转换时对我来说工作正常。但是卡在 subprocess.stdout.read() 否则(使用 readline 不会以任何方式产生影响):
generator_process = subprocess.Popen(generator_command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, bufsize=1)
convert_process = None
if convert_command:
convert_process = subprocess.Popen(convert_command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.PIPE, bufsize=1)
while True:
chunk = generator_process.stdout.read(chunk_size)
if convert_process:
if chunk:
convert_process.stdin.write(chunk)
chunk = convert_process.stdout.read(chunk_size)
if chunk: yield chunk
else: break
else:
if chunk: yield chunk
else: break
请注意,由于这是一个 wsgi 应用程序,所以 asyncio/coroutines 将无法工作(这是我在进入 asyncio 兔子洞后意识到的)。
python 有什么方法可以从子进程流式传输到客户端而不会出现死锁吗?
如果 none 个子进程尝试从标准输入读取,那么我在您的代码中看到的死锁的唯一原因是 .write(chunk)
、.read(chunk_size)
可能会退出sync if convert_process
does not return byte for byte(如果 .flush()
在 .write(chunk)
之后没有帮助)。
要在 Python 中模拟 generator | convert
命令:
#!/usr/bin/env python3
from functools import partial
from subprocess import Popen, PIPE, DEVNULL
def get_chunks(generator_command, convert_command, chunk_size=1024):
with Popen(generator_command, stdin=DEVNULL,
stdout=PIPE, stderr=DEVNULL) as generator_process, \
Popen(convert_command, stdin=generator_process.stdout,
stdout=PIPE, stderr=DEVNULL) as convert_process:
yield from iter(partial(convert_process.stdout.read, chunk_size), b'')
return generator_process.returncode, convert_process.returncode
在我的 wsgi 应用程序中,我必须调用命令行工具来生成数据,有时还必须调用另一个命令行工具来转换该数据。这一切都通过 stdout/stdin 起作用。我正在使用子进程,我以前使用过通信,它工作正常但速度较慢,因为它不是流式传输并且试图将其转换为从 stdout 增量流式传输是导致我出现问题的原因。
旧代码(有效但不流式传输):
generator_process = subprocess.Popen(generator_command, stdout=subprocess.PIPE)
if convert_command:
convert_process = subprocess.Popen(convert_command, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
output, err = generator_process.communicate()
if convert_command:
output, err = convert_process.communicate(output)
yield output
当前代码 - 在不需要进行转换时对我来说工作正常。但是卡在 subprocess.stdout.read() 否则(使用 readline 不会以任何方式产生影响):
generator_process = subprocess.Popen(generator_command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, bufsize=1)
convert_process = None
if convert_command:
convert_process = subprocess.Popen(convert_command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.PIPE, bufsize=1)
while True:
chunk = generator_process.stdout.read(chunk_size)
if convert_process:
if chunk:
convert_process.stdin.write(chunk)
chunk = convert_process.stdout.read(chunk_size)
if chunk: yield chunk
else: break
else:
if chunk: yield chunk
else: break
请注意,由于这是一个 wsgi 应用程序,所以 asyncio/coroutines 将无法工作(这是我在进入 asyncio 兔子洞后意识到的)。
python 有什么方法可以从子进程流式传输到客户端而不会出现死锁吗?
如果 none 个子进程尝试从标准输入读取,那么我在您的代码中看到的死锁的唯一原因是 .write(chunk)
、.read(chunk_size)
可能会退出sync if convert_process
does not return byte for byte(如果 .flush()
在 .write(chunk)
之后没有帮助)。
要在 Python 中模拟 generator | convert
命令:
#!/usr/bin/env python3
from functools import partial
from subprocess import Popen, PIPE, DEVNULL
def get_chunks(generator_command, convert_command, chunk_size=1024):
with Popen(generator_command, stdin=DEVNULL,
stdout=PIPE, stderr=DEVNULL) as generator_process, \
Popen(convert_command, stdin=generator_process.stdout,
stdout=PIPE, stderr=DEVNULL) as convert_process:
yield from iter(partial(convert_process.stdout.read, chunk_size), b'')
return generator_process.returncode, convert_process.returncode