Python 缓冲 IO 结束了使用多个管道的早期流式传输 [BOUNTY]
Python buffered IO ending early streaming with multiple pipes [BOUNTY]
我正在尝试对通过 yt-dlp 下载的视频进行连续直播。
我需要将这个(工作中的)bash 命令移植到 Python.
(
youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
) | ffmpeg -re -i - -c:v libx264 -f flv rtmp://127.0.0.1/live/H1P_x5WPF
我的 Python 尝试切断每个视频的最后 ~2 秒。我怀疑虽然第一个管道 yt-dlp 的标准输出为空,但第二个和第三个管道之间仍有数据传输。在视频结尾处,我一直没能找到正确处理这两个管道之间数据的方法。
from subprocess import Popen, PIPE, DEVNULL
COPY_BUFSIZE = 65424
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
if __name__ == "__main__":
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re", "-i", "-",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
"rtmp://127.0.0.1/live/H1P_x5WPF"
]
print(f'Stream command:\n"{" ".join(stream_cmd)}"')
encoder_cmd = [
"ffmpeg", "-re", "-i", "-", "-f", "mpegts",
"-c", "copy", "-"
]
print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')
stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
for video in playlist:
yt_dlp_cmd = [
"yt-dlp", "-q",
video["url"],
"-o", "-"
]
print("Now playing: " + video["url"])
with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
while True:
yt_dlp_buf = yt_dlp_p.stdout.read(COPY_BUFSIZE)
print("READ: yt_dlp")
if not yt_dlp_buf:
print("yt-dlp buffer empty")
# Handle any data in 2nd/3rd pipes before breaking?
break
written = encoder_p.stdin.write(yt_dlp_buf)
print("WRITE: encoder. Bytes: " + str(written))
encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
# if not encoder_buf:
# print("encoder_buf empty")
# break
print("READ: encoder")
stream_bytes_written = stream_p.stdin.write(encoder_buf)
print("WRITE: stream, Bytes: " + str(stream_bytes_written))
20 美元 BTC 赏金给任何能提供帮助的人。
运行 Python MacOS 上的 3.6.9。
需要关闭 stdin
管道才能将子进程剩余(缓冲)数据“推送”到 stdout
管道。
例如encoder_p.stdin
写入所有数据后添加encoder_p.stdin.close()
.
我不明白你的代码是如何工作的。
在我的机器中,它在 encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
.
处获得堆栈
我使用“编写线程”解决了这个问题。
“写入线程”从 yt_dlp_p
读取数据并将其写入 encoder_p.stdin
.
注意:在你的具体情况下,它可以在没有线程的情况下工作(因为数据只是通过 FFmpeg 传递,而不是被编码),但通常,编码数据在将输入写入 FFmpeg 后还没有准备好.
我的代码示例使用 FFplay 子进程播放视频(我们需要视频播放器,因为 RTMP 流式传输需要“侦听器”才能保持流式传输)。
这是一个完整的代码示例:
from subprocess import Popen, PIPE, DEVNULL
import threading
import time
COPY_BUFSIZE = 65424
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
# Writer thread (read from yt-dlp and write to FFmpeg in chunks of COPY_BUFSIZE bytes).
def writer(yt_dlp_proc, encoder_proc):
while True:
yt_dlp_buf = yt_dlp_proc.stdout.read(COPY_BUFSIZE)
print("READ: yt_dlp")
if not yt_dlp_buf:
print("yt-dlp buffer empty")
break
written = encoder_proc.stdin.write(yt_dlp_buf)
print("WRITE: encoder. Bytes: " + str(written))
encoder_proc.stdin.close() # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
encoder_proc.wait() # Wait for sub-process finish execution.
if __name__ == "__main__":
rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL) # Use FFplay sub-process for receiving the RTMP video.
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re", "-i", "-",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
rtmp_url #"rtmp://127.0.0.1/live/H1P_x5WPF"
]
print(f'Stream command:\n"{" ".join(stream_cmd)}"')
encoder_cmd = [
"ffmpeg", "-re", "-i", "-", "-f", "mpegts",
"-c", "copy", "-"
]
print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')
stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
for video in playlist:
yt_dlp_cmd = [
"yt-dlp", "-q",
video["url"],
"-o", "-"
]
print("Now playing: " + video["url"])
with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
thread = threading.Thread(target=writer, args=(yt_dlp_p, encoder_p))
thread.start() # Start writer thread.
while True:
encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
if not encoder_buf:
print("encoder_buf empty")
break
print("READ: encoder")
stream_bytes_written = stream_p.stdin.write(encoder_buf)
print("WRITE: stream, Bytes: " + str(stream_bytes_written))
thread.join() # Wait for writer thread to end.
yt_dlp_p.wait()
stream_p.stdin.close() # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
stream_p.wait() # Wait for sub-process finish execution.
time.sleep(3) # Wait 3 seconds before closing FFplay
ffplay_process.kill() # Forcefully close FFplay sub-process
更新:
我使用 pytube and concat filter(没有管道)找到了一个更简单的解决方案。
我不知道这个解决方案是否适合你...
代码示例:
from pytube import YouTube
from subprocess import Popen, run, PIPE, DEVNULL
import time
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
n = len(playlist)
# Build string for concat demuxer https://video.stackexchange.com/a/18256/18277
filter_complex_str = ''
for i in range(n):
filter_complex_str += f'[{i}:v]setpts=PTS-STARTPTS[v{i}];[{i}:a]asetpts=PTS-STARTPTS[a{i}];' # "[0:v]setpts=PTS-STARTPTS[v0];[0:a]asetpts=PTS-STARTPTS[a0];[1:v]setpts=PTS-STARTPTS[v1];[1:a]asetpts=PTS-STARTPTS[a1];[2:v]setpts=PTS-STARTPTS[v2];[2:a]asetpts=PTS-STARTPTS[a2]"
for i in range(n):
filter_complex_str += f'[v{i}][a{i}]' # ";[v0][a0][v1][a1][v2][a2]"
filter_complex_str += f'concat=n={n}:v=1:a=1[v][a]'
# Get the video stream URL of every YouTube HTTP URL.
# Add -i before each URL (to be used as FFmpeg input).
playlist_url = []
for video in playlist:
yt = YouTube(video["url"])
# https://github.com/pytube/pytube/issues/301
stream_url = yt.streams[0].url # Get the URL of the video stream
playlist_url.append('-i')
playlist_url.append(stream_url)
rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL) # Use FFplay sub-process for receiving the RTMP video.
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re"] + playlist_url + ["-filter_complex",
filter_complex_str, # '[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1[v][a]'
"-map", "[v]", "-map", "[a]",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
rtmp_url]
run(stream_cmd)
time.sleep(60) # Wait 60 seconds before closing FFplay
ffplay_process.kill() # Forcefully close FFplay sub-process
我正在尝试对通过 yt-dlp 下载的视频进行连续直播。 我需要将这个(工作中的)bash 命令移植到 Python.
(
youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
) | ffmpeg -re -i - -c:v libx264 -f flv rtmp://127.0.0.1/live/H1P_x5WPF
我的 Python 尝试切断每个视频的最后 ~2 秒。我怀疑虽然第一个管道 yt-dlp 的标准输出为空,但第二个和第三个管道之间仍有数据传输。在视频结尾处,我一直没能找到正确处理这两个管道之间数据的方法。
from subprocess import Popen, PIPE, DEVNULL
COPY_BUFSIZE = 65424
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
if __name__ == "__main__":
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re", "-i", "-",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
"rtmp://127.0.0.1/live/H1P_x5WPF"
]
print(f'Stream command:\n"{" ".join(stream_cmd)}"')
encoder_cmd = [
"ffmpeg", "-re", "-i", "-", "-f", "mpegts",
"-c", "copy", "-"
]
print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')
stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
for video in playlist:
yt_dlp_cmd = [
"yt-dlp", "-q",
video["url"],
"-o", "-"
]
print("Now playing: " + video["url"])
with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
while True:
yt_dlp_buf = yt_dlp_p.stdout.read(COPY_BUFSIZE)
print("READ: yt_dlp")
if not yt_dlp_buf:
print("yt-dlp buffer empty")
# Handle any data in 2nd/3rd pipes before breaking?
break
written = encoder_p.stdin.write(yt_dlp_buf)
print("WRITE: encoder. Bytes: " + str(written))
encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
# if not encoder_buf:
# print("encoder_buf empty")
# break
print("READ: encoder")
stream_bytes_written = stream_p.stdin.write(encoder_buf)
print("WRITE: stream, Bytes: " + str(stream_bytes_written))
20 美元 BTC 赏金给任何能提供帮助的人。
运行 Python MacOS 上的 3.6.9。
需要关闭 stdin
管道才能将子进程剩余(缓冲)数据“推送”到 stdout
管道。
例如encoder_p.stdin
写入所有数据后添加encoder_p.stdin.close()
.
我不明白你的代码是如何工作的。
在我的机器中,它在 encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
.
我使用“编写线程”解决了这个问题。
“写入线程”从 yt_dlp_p
读取数据并将其写入 encoder_p.stdin
.
注意:在你的具体情况下,它可以在没有线程的情况下工作(因为数据只是通过 FFmpeg 传递,而不是被编码),但通常,编码数据在将输入写入 FFmpeg 后还没有准备好.
我的代码示例使用 FFplay 子进程播放视频(我们需要视频播放器,因为 RTMP 流式传输需要“侦听器”才能保持流式传输)。
这是一个完整的代码示例:
from subprocess import Popen, PIPE, DEVNULL
import threading
import time
COPY_BUFSIZE = 65424
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
# Writer thread (read from yt-dlp and write to FFmpeg in chunks of COPY_BUFSIZE bytes).
def writer(yt_dlp_proc, encoder_proc):
while True:
yt_dlp_buf = yt_dlp_proc.stdout.read(COPY_BUFSIZE)
print("READ: yt_dlp")
if not yt_dlp_buf:
print("yt-dlp buffer empty")
break
written = encoder_proc.stdin.write(yt_dlp_buf)
print("WRITE: encoder. Bytes: " + str(written))
encoder_proc.stdin.close() # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
encoder_proc.wait() # Wait for sub-process finish execution.
if __name__ == "__main__":
rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL) # Use FFplay sub-process for receiving the RTMP video.
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re", "-i", "-",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
rtmp_url #"rtmp://127.0.0.1/live/H1P_x5WPF"
]
print(f'Stream command:\n"{" ".join(stream_cmd)}"')
encoder_cmd = [
"ffmpeg", "-re", "-i", "-", "-f", "mpegts",
"-c", "copy", "-"
]
print(f'Encoder command:\n"{" ".join(encoder_cmd)}"')
stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
for video in playlist:
yt_dlp_cmd = [
"yt-dlp", "-q",
video["url"],
"-o", "-"
]
print("Now playing: " + video["url"])
with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
thread = threading.Thread(target=writer, args=(yt_dlp_p, encoder_p))
thread.start() # Start writer thread.
while True:
encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
if not encoder_buf:
print("encoder_buf empty")
break
print("READ: encoder")
stream_bytes_written = stream_p.stdin.write(encoder_buf)
print("WRITE: stream, Bytes: " + str(stream_bytes_written))
thread.join() # Wait for writer thread to end.
yt_dlp_p.wait()
stream_p.stdin.close() # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
stream_p.wait() # Wait for sub-process finish execution.
time.sleep(3) # Wait 3 seconds before closing FFplay
ffplay_process.kill() # Forcefully close FFplay sub-process
更新:
我使用 pytube and concat filter(没有管道)找到了一个更简单的解决方案。
我不知道这个解决方案是否适合你...
代码示例:
from pytube import YouTube
from subprocess import Popen, run, PIPE, DEVNULL
import time
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
n = len(playlist)
# Build string for concat demuxer https://video.stackexchange.com/a/18256/18277
filter_complex_str = ''
for i in range(n):
filter_complex_str += f'[{i}:v]setpts=PTS-STARTPTS[v{i}];[{i}:a]asetpts=PTS-STARTPTS[a{i}];' # "[0:v]setpts=PTS-STARTPTS[v0];[0:a]asetpts=PTS-STARTPTS[a0];[1:v]setpts=PTS-STARTPTS[v1];[1:a]asetpts=PTS-STARTPTS[a1];[2:v]setpts=PTS-STARTPTS[v2];[2:a]asetpts=PTS-STARTPTS[a2]"
for i in range(n):
filter_complex_str += f'[v{i}][a{i}]' # ";[v0][a0][v1][a1][v2][a2]"
filter_complex_str += f'concat=n={n}:v=1:a=1[v][a]'
# Get the video stream URL of every YouTube HTTP URL.
# Add -i before each URL (to be used as FFmpeg input).
playlist_url = []
for video in playlist:
yt = YouTube(video["url"])
# https://github.com/pytube/pytube/issues/301
stream_url = yt.streams[0].url # Get the URL of the video stream
playlist_url.append('-i')
playlist_url.append(stream_url)
rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL) # Use FFplay sub-process for receiving the RTMP video.
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re"] + playlist_url + ["-filter_complex",
filter_complex_str, # '[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1[v][a]'
"-map", "[v]", "-map", "[a]",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
rtmp_url]
run(stream_cmd)
time.sleep(60) # Wait 60 seconds before closing FFplay
ffplay_process.kill() # Forcefully close FFplay sub-process