如何将数据从 Python asyncio 套接字服务器发送到子进程?

How to send data from a Python asyncio socket server to a subprocess?

Python 3.6

这个程序:

  1. 将 ffmpeg 作为子进程启动
  2. 等待套接字连接
  3. 在套接字上接收 PNG 图像
  4. 将PNG图像发送到ffmpeg 标准输入

问题出在第 4 步。我不知道如何将接收到的 PNG 图像从协程发送到 ffmpeg 子进程的标准输入。谁能给我指出正确的方向,将 PNG 图像发送到 ffmpeg 子进程的标准输入?

编辑:澄清一下——这段代码没有任何问题,它通过套接字接收 PNG 文件。我只是不知道如何将 PNG 发送到 ffmpeg 的标准输入中。我已经完成了很多 Python,但 asyncio 对我来说是新手,它们是如何联系在一起的还是个谜。

谢谢!

import asyncio
import argparse, sys
import sys
import base64
from struct import unpack

parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
    print("port is required")
    sys.exit(1)
if not args.outputfilename:
    print("outputfilename is required")
    sys.exit(1)

async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break

async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )

    await asyncio.wait([
        _read_stream(process.stdout, stdout_cb),
        _read_stream(process.stderr, stderr_cb)
    ])
    return await process.wait()


def process_stderr(line):
    # ffmpeg finishes processing and writes the output file when its input is closed
    # thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
    line = line.decode()
    print(line)
    if "Output" in line:
        if args.outputfilename in line:
            print('finished!!!!')
            sys.exit(0)

def process_stdout(line):
    print("STDOUT: %s" % line)

def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
    outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
    input_type = "pipe:0" #stdin

    params = \
        f"ffmpeg  " \
        f"-loglevel 56 " \
        f"-y -framerate {framerate} " \
        f"-f image2pipe " \
        f"-i {input_type} " \
        f"-c:v libvpx-vp9 " \
        f"-b:v 1024k " \
        f"-q:v 0 " \
        f"-pix_fmt yuva420p " \
        f"{outputdirectory}{outputfilename} "

    return params


async def socket_png_receiver(reader, writer):
    while True:
        # first the client sends the length of the data to us
        lengthbuf = await reader.read(4)
        length, = unpack('!I', lengthbuf)
        if length == 0:
            print("length was 0, finish") # a zero length PNG says that there are no more frames
            break
        # then we read the PNG
        data = await reader.read(length)
        data = data.decode() # from bytes to string
        png_bytes = base64.b64decode(data) # from base64 to bytes
        # next line was just a guess, so I have commented it out.
        #await proc.communicate(png_bytes)
        print("Got PNG, length", length)
    return


loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
    command.split(),
    process_stdout,
    process_stderr,
)
#coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, ffmpeg_process, loop=loop)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()

这是@user4815162342

建议的更改
import asyncio
import argparse, sys
import sys
import base64
from struct import unpack

parser = argparse.ArgumentParser()
parser.add_argument('--port', help='ffmpeg listen port')
parser.add_argument('--outputfilename', help='ffmpeg output filename')
args = parser.parse_args()
if not args.port:
    print("port is required")
    sys.exit(1)
if not args.outputfilename:
    print("outputfilename is required")
    sys.exit(1)
if not args.outputfilename.endswith('.webm'):
    print("outputfilename must end with '.webm'")
    sys.exit(1)

async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    global process
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )

    await asyncio.wait([
        _read_stream(process.stdout, stdout_cb),
        _read_stream(process.stderr, stderr_cb)
    ])
    return await process.wait()


def process_stderr(line):
    # ffmpeg finishes processing and writes the output file when its input is closed
    # thus the completion message will come out of stderr only when the socket or stdin or whatever is closed
    line = line.decode()
    print(line)
    if "Output" in line:
        if args.outputfilename in line:
            print('finished!!!!')
            sys.exit(0)


def process_stdout(line):
    print("STDOUT: %s" % line)


def spawn_ffmpeg(listenport, outputfilename, framerate=30, format='webm'):
    outputdirectory = "sftp://username:password@10.0.0.196/var/www/static/"
    input_type = "pipe:0"  # stdin

    params = \
        f"ffmpeg  " \
        f"-loglevel 56 " \
        f"-y " \
        f"-framerate {framerate} " \
        f"-i {input_type} " \
        f"{outputdirectory}{outputfilename} "

    print(params)
    return params


async def socket_png_receiver(reader, writer):
    while True:
        # first the client sends the length of the data to us
        lengthbuf = await reader.readexactly(4)
        length, = unpack('!I', lengthbuf)
        if length == 0:
            print("length was 0, finish")  # a zero length PNG says that there are no more frames
            break
        # then we read the PNG
        print("Got PNG, length", length)
        data = await reader.readexactly(length)
        print(data)
        png_bytes = base64.b64decode(data)  # from base64 to bytes
        process.stdin.write(png_bytes)
    return


loop = asyncio.get_event_loop()
command = spawn_ffmpeg("24897", args.outputfilename)
ffmpeg_process = _stream_subprocess(
    command.split(),
    process_stdout,
    process_stderr,
)
coro = asyncio.start_server(socket_png_receiver, '0.0.0.0', args.port, loop=loop)
several_futures = asyncio.gather(ffmpeg_process, coro)
server = loop.run_until_complete(several_futures)
server.close()
loop.close()

代码有几个问题:

  • await reader.read(length) 应该是 await reader.readexactly(length) 因为 StreamReader.read 的参数是要读取的 maximum 字节数,它可以 return 更少。

  • proc.communicate(png_bytes)应该改为proc.stdin.write(png_bytes)。此处对 communicate() 的调用不正确,因为您想继续与程序对话,而 communicate() 等待所有流关闭。

  • asyncio.create_subprocess_exec(...) 编辑的进程实例 return 必须可供 socket_png_receiver 使用,例如通过使用 global process 使 process 变量成为全局变量。 (最好使用 class 并分配给 self.process,但这超出了本答案的范围。)

一些潜在问题:

  • 不需要将data从字节解码为字符串,base64.b64decode可以接受字节就好了。

  • spawn_ffmpeg() 似乎没有使用其 listenport 参数。