Python 缓冲 ​​IO 结束了使用多个管道的早期流式传输 [BOUNTY]

Python buffered IO ending early streaming with multiple pipes [BOUNTY]

我正在尝试对通过 yt-dlp 下载的视频进行连续直播。 我需要将这个(工作中的)bash 命令移植到 Python.

(
    youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
    youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
) | ffmpeg -re -i - -c:v libx264 -f flv rtmp://127.0.0.1/live/H1P_x5WPF

我的 Python 尝试切断每个视频的最后 ~2 秒。我怀疑虽然第一个管道 yt-dlp 的标准输出为空,但第二个和第三个管道之间仍有数据传输。在视频结尾处,我一直没能找到正确处理这两个管道之间数据的方法。

from subprocess import Popen, PIPE, DEVNULL

COPY_BUFSIZE = 65424

playlist = [
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
]

if __name__ == "__main__":
    stream_cmd = [
        "ffmpeg", "-loglevel", "error",
        "-hide_banner", "-re", "-i", "-",
        "-c:v", "libx264",
        "-f", "flv",
        "-b:v", "3000k", "-minrate", "3000k",
        "-maxrate", "3000k", "-bufsize", "3000k",
        "-r", "25", "-pix_fmt", "yuv420p",
        "rtmp://127.0.0.1/live/H1P_x5WPF"
    ]
    print(f'Stream command:\n"{" ".join(stream_cmd)}"')

    encoder_cmd = [
        "ffmpeg", "-re", "-i", "-", "-f", "mpegts",
        "-c", "copy", "-"
    ]
    print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')

    stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)

    for video in playlist:
        yt_dlp_cmd = [
            "yt-dlp", "-q",
            video["url"],
            "-o", "-"
        ]

        print("Now playing: " + video["url"])

        with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
            with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
                while True:
                    yt_dlp_buf = yt_dlp_p.stdout.read(COPY_BUFSIZE)
                    print("READ: yt_dlp")
                    if not yt_dlp_buf:
                        print("yt-dlp buffer empty")
                        # Handle any data in 2nd/3rd pipes before breaking?
                        break

                    written = encoder_p.stdin.write(yt_dlp_buf)
                    print("WRITE: encoder. Bytes: " + str(written))

                    encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
                    # if not encoder_buf:
                    #     print("encoder_buf empty")
                    #     break
                    print("READ: encoder")

                    stream_bytes_written = stream_p.stdin.write(encoder_buf)
                    print("WRITE: stream, Bytes: " + str(stream_bytes_written))

20 美元 BTC 赏金给任何能提供帮助的人。

运行 Python MacOS 上的 3.6.9。

需要关闭 stdin 管道才能将子进程剩余(缓冲)数据“推送”到 stdout 管道。

例如encoder_p.stdin写入所有数据后添加encoder_p.stdin.close().


我不明白你的代码是如何工作的。
在我的机器中,它在 encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE).

处获得堆栈

我使用“编写线程”解决了这个问题。
“写入线程”从 yt_dlp_p 读取数据并将其写入 encoder_p.stdin.

注意:在你的具体情况下,它可以在没有线程的情况下工作(因为数据只是通过 FFmpeg 传递,而不是被编码),但通常,编码数据在将输入写入 FFmpeg 后还没有准备好.


我的代码示例使用 FFplay 子进程播放视频(我们需要视频播放器,因为 RTMP 流式传输需要“侦听器”才能保持流式传输)。


这是一个完整的代码示例:

from subprocess import Popen, PIPE, DEVNULL
import threading
import time

COPY_BUFSIZE = 65424

playlist = [
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
]



# Writer thread (read from yt-dlp and write to FFmpeg in chunks of COPY_BUFSIZE bytes).
def writer(yt_dlp_proc, encoder_proc):
    while True:
        yt_dlp_buf = yt_dlp_proc.stdout.read(COPY_BUFSIZE)
        print("READ: yt_dlp")
        if not yt_dlp_buf:
            print("yt-dlp buffer empty")
            break

        written = encoder_proc.stdin.write(yt_dlp_buf)
        print("WRITE: encoder. Bytes: " + str(written))
    
    encoder_proc.stdin.close()  # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
    encoder_proc.wait()  # Wait for sub-process finish execution.


if __name__ == "__main__":
    rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"

    ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.    
    ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL)  # Use FFplay sub-process for receiving the RTMP video.

    stream_cmd = [
        "ffmpeg", "-loglevel", "error",
        "-hide_banner", "-re", "-i", "-",
        "-c:v", "libx264",
        "-f", "flv",
        "-b:v", "3000k", "-minrate", "3000k",
        "-maxrate", "3000k", "-bufsize", "3000k",
        "-r", "25", "-pix_fmt", "yuv420p",
        rtmp_url #"rtmp://127.0.0.1/live/H1P_x5WPF"
    ]
    print(f'Stream command:\n"{" ".join(stream_cmd)}"')

    encoder_cmd = [
        "ffmpeg", "-re", "-i", "-", "-f", "mpegts",
        "-c", "copy", "-"
    ]
    print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')

    stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)

    for video in playlist:
        yt_dlp_cmd = [
            "yt-dlp", "-q",
            video["url"],
            "-o", "-"
        ]

        print("Now playing: " + video["url"])

        with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
            with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:

                thread = threading.Thread(target=writer, args=(yt_dlp_p, encoder_p))
                thread.start()  # Start writer thread.

                while True:
                    encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)

                    if not encoder_buf:
                        print("encoder_buf empty")
                        break

                    print("READ: encoder")

                    stream_bytes_written = stream_p.stdin.write(encoder_buf)
                    print("WRITE: stream, Bytes: " + str(stream_bytes_written))

        thread.join()  # Wait for writer thread to end.
        yt_dlp_p.wait()

    stream_p.stdin.close()  # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
    stream_p.wait()  # Wait for sub-process finish execution.


    time.sleep(3)  # Wait 3 seconds before closing FFplay
    ffplay_process.kill()  # Forcefully close FFplay sub-process

更新:

我使用 pytube and concat filter(没有管道)找到了一个更简单的解决方案。

我不知道这个解决方案是否适合你...

代码示例:

from pytube import YouTube
from subprocess import Popen, run, PIPE, DEVNULL
import time

playlist = [
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
    {
        # 15 second video
        "url": "https://youtube.com/watch?v=QiInzFHIDp4"
    },
]

n = len(playlist)

# Build string for concat demuxer https://video.stackexchange.com/a/18256/18277
filter_complex_str = ''
for i in range(n):
    filter_complex_str += f'[{i}:v]setpts=PTS-STARTPTS[v{i}];[{i}:a]asetpts=PTS-STARTPTS[a{i}];'   # "[0:v]setpts=PTS-STARTPTS[v0];[0:a]asetpts=PTS-STARTPTS[a0];[1:v]setpts=PTS-STARTPTS[v1];[1:a]asetpts=PTS-STARTPTS[a1];[2:v]setpts=PTS-STARTPTS[v2];[2:a]asetpts=PTS-STARTPTS[a2]"
for i in range(n):
    filter_complex_str += f'[v{i}][a{i}]'  # ";[v0][a0][v1][a1][v2][a2]"
filter_complex_str += f'concat=n={n}:v=1:a=1[v][a]'

# Get the video stream URL of every YouTube HTTP URL.
# Add -i before each URL (to be used as FFmpeg input).
playlist_url = []
for video in playlist:
    yt = YouTube(video["url"])
    # https://github.com/pytube/pytube/issues/301
    stream_url = yt.streams[0].url  # Get the URL of the video stream
    playlist_url.append('-i')
    playlist_url.append(stream_url)


rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"

ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url]  # Start the TCP server first, before the sending client.
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL)  # Use FFplay sub-process for receiving the RTMP video.

stream_cmd = [
    "ffmpeg", "-loglevel", "error",
    "-hide_banner", "-re"] + playlist_url + ["-filter_complex",
    filter_complex_str,  # '[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1[v][a]'
    "-map", "[v]", "-map", "[a]",
    "-c:v", "libx264",
    "-f", "flv",
    "-b:v", "3000k", "-minrate", "3000k",
    "-maxrate", "3000k", "-bufsize", "3000k",
    "-r", "25", "-pix_fmt", "yuv420p",
    rtmp_url]

run(stream_cmd)

time.sleep(60)  # Wait 60 seconds before closing FFplay
ffplay_process.kill()  # Forcefully close FFplay sub-process