接收器在失败时重新启动而不停止管道
Sink restart on failure without stopping the pipeline
今天,我决定将基于 gst-launch
的小脚本转换为真正的 Python/GStreamer 应用程序,以添加一些功能。
感谢 tee
.
,我开发了一个小程序,可以将音频从我的麦克风发送到 Icecast (shout2send
) 和本地存储 (filesink
)
有时 shout2send
可能会由于网络问题而停止。我想每隔 N 秒重新启动此元素,直到连接恢复,而不停止管道,因为本地音频文件不应受到网络条件的影响。
这是我尝试过的:
- stopping/starting 网络错误后一秒的管道(结果:流式传输工作,本地文件被 t运行 处理)
- 取消与
tee
的链接,将 shout2send
状态设置为 NULL
并将其从管道中删除(结果:GStreamer 严重错误,如 Trying to dispose element ... but it is in PLAYING instead of the NULL state
)
- 试图了解如何在这种情况下使用 pad(结果:与上面相同,但涉及更多代码)
我该怎么办?
我的代码如下所示:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib
from gi.repository import Gst
# [...]
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pass # TODO: restart the element
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
pipeline = Gst.Pipeline()
message_bus = pipeline.get_bus()
message_bus.add_signal_watch()
message_bus.connect('message', message_handler)
# [...]
tee.link(queue0)
queue0.link(filesink)
tee.link(queue1)
queue1.link(shout2send)
更新 (9/12/15):添加了非工作代码 + 日志
我尝试遵循 "Dynamically changing the pipeline" fro GStreamer doc,但我的代码不起作用。
def event_probe(pad, info, *args):
Gst.Pad.remove_probe(pad, info)
queue1.unlink(shout2send)
tee.unlink(queue1)
pipeline.remove(shout2send)
pipeline.remove(queue1)
return Gst.PadProbeReturn.OK
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pad = queue1.get_static_pad('src')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
如果我使用 GST_DEBUG=3
运行 我的脚本并在流式传输时重新启动 Icecast,我会看到以下内容:
[...]
0:00:02.142033258 5462 0x55e414d900a0 WARN shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
0:00:02.658137998 5462 0x55e414d90140 WARN basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
0:00:02.658169752 5462 0x55e414d90140 WARN basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
(GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
0:00:02.658628129 5462 0x7f6ba8002a30 WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
感谢 otopolsky 的评论,我做到了:)
我做错了什么:
- 元素必须设置为
NULL
:这很重要
oggmux
必须留在 tee
之后,在两个子管道上:否则 Icecast 将列出流而无法提供它。对 opusenc
做同样的事情
建议:
- 没有必要取消链接您不需要的每个元素:只需在需要的地方断开
- 没有必要从管道中删除您不需要的每个元素:如果您想重新使用它们,请保留它们
最终代码(重新连接工作正常且独立于本地 encoding/recording):
def event_probe2(pad, info, *args):
Gst.Pad.remove_probe(pad, info.id)
tee.link(opusenc1)
opusenc1.set_state(Gst.State.PLAYING)
oggmux1.set_state(Gst.State.PLAYING)
queue1.set_state(Gst.State.PLAYING)
shout2send.set_state(Gst.State.PLAYING)
return Gst.PadProbeReturn.OK
def reconnect():
pad = tee.get_static_pad('src_1')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)
def event_probe(pad, info, *args):
Gst.Pad.remove_probe(pad, info.id)
tee.unlink(opusenc1)
opusenc1.set_state(Gst.State.NULL)
oggmux1.set_state(Gst.State.NULL)
queue1.set_state(Gst.State.NULL)
shout2send.set_state(Gst.State.NULL)
GLib.timeout_add_seconds(interval, reconnect)
return Gst.PadProbeReturn.OK
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pad = tee.get_static_pad('src_1')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
小问题:
- 我使用
tee.get_static_pad('src_1')
,但我想我可以在某处获取 src id,而不是使用固定值
- 可能整个事情可以写成更好的形式(但这是我第一个使用 Python+Gstreamer 的程序,它可以工作,所以我很好)
- 为了避免数据丢失,我在
pipeline.send_event(Gst.Event.new_eos())
后一秒调用了 pipeline.set_state(Gst.State.NULL)
,但我仍然收到 WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
之类的消息
今天,我决定将基于 gst-launch
的小脚本转换为真正的 Python/GStreamer 应用程序,以添加一些功能。
感谢 tee
.
shout2send
) 和本地存储 (filesink
)
有时 shout2send
可能会由于网络问题而停止。我想每隔 N 秒重新启动此元素,直到连接恢复,而不停止管道,因为本地音频文件不应受到网络条件的影响。
这是我尝试过的:
- stopping/starting 网络错误后一秒的管道(结果:流式传输工作,本地文件被 t运行 处理)
- 取消与
tee
的链接,将shout2send
状态设置为NULL
并将其从管道中删除(结果:GStreamer 严重错误,如Trying to dispose element ... but it is in PLAYING instead of the NULL state
) - 试图了解如何在这种情况下使用 pad(结果:与上面相同,但涉及更多代码)
我该怎么办?
我的代码如下所示:
import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib
from gi.repository import Gst
# [...]
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pass # TODO: restart the element
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
pipeline = Gst.Pipeline()
message_bus = pipeline.get_bus()
message_bus.add_signal_watch()
message_bus.connect('message', message_handler)
# [...]
tee.link(queue0)
queue0.link(filesink)
tee.link(queue1)
queue1.link(shout2send)
更新 (9/12/15):添加了非工作代码 + 日志
我尝试遵循 "Dynamically changing the pipeline" fro GStreamer doc,但我的代码不起作用。
def event_probe(pad, info, *args):
Gst.Pad.remove_probe(pad, info)
queue1.unlink(shout2send)
tee.unlink(queue1)
pipeline.remove(shout2send)
pipeline.remove(queue1)
return Gst.PadProbeReturn.OK
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pad = queue1.get_static_pad('src')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
如果我使用 GST_DEBUG=3
运行 我的脚本并在流式传输时重新启动 Icecast,我会看到以下内容:
[...]
0:00:02.142033258 5462 0x55e414d900a0 WARN shout2 gstshout2.c:674:gst_shout2send_render:<shout2send> error: shout_send() failed: Socket error
0:00:02.658137998 5462 0x55e414d90140 WARN basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: Internal data flow error.
0:00:02.658169752 5462 0x55e414d90140 WARN basesrc gstbasesrc.c:2943:gst_base_src_loop:<pulsesrc> error: streaming task paused, reason error (-5)
(GLib.Error('Internal data flow error.', 'gst-stream-error-quark', 1), 'gstbasesrc.c(2943): gst_base_src_loop (): /GstPipeline:pipeline0/GstPulseSrc:pulsesrc:\nstreaming task paused, reason error (-5)')
0:00:02.658628129 5462 0x7f6ba8002a30 WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
感谢 otopolsky 的评论,我做到了:)
我做错了什么:
- 元素必须设置为
NULL
:这很重要 oggmux
必须留在tee
之后,在两个子管道上:否则 Icecast 将列出流而无法提供它。对opusenc
做同样的事情
建议:
- 没有必要取消链接您不需要的每个元素:只需在需要的地方断开
- 没有必要从管道中删除您不需要的每个元素:如果您想重新使用它们,请保留它们
最终代码(重新连接工作正常且独立于本地 encoding/recording):
def event_probe2(pad, info, *args):
Gst.Pad.remove_probe(pad, info.id)
tee.link(opusenc1)
opusenc1.set_state(Gst.State.PLAYING)
oggmux1.set_state(Gst.State.PLAYING)
queue1.set_state(Gst.State.PLAYING)
shout2send.set_state(Gst.State.PLAYING)
return Gst.PadProbeReturn.OK
def reconnect():
pad = tee.get_static_pad('src_1')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe2, None)
def event_probe(pad, info, *args):
Gst.Pad.remove_probe(pad, info.id)
tee.unlink(opusenc1)
opusenc1.set_state(Gst.State.NULL)
oggmux1.set_state(Gst.State.NULL)
queue1.set_state(Gst.State.NULL)
shout2send.set_state(Gst.State.NULL)
GLib.timeout_add_seconds(interval, reconnect)
return Gst.PadProbeReturn.OK
def message_handler(bus, message):
if message.type == Gst.MessageType.ERROR:
if message.src == shout2send:
pad = tee.get_static_pad('src_1')
pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, event_probe, None)
else:
print(message.parse_error())
pipeline.set_state(Gst.State.NULL)
exit(1)
else:
print(message.type)
小问题:
- 我使用
tee.get_static_pad('src_1')
,但我想我可以在某处获取 src id,而不是使用固定值 - 可能整个事情可以写成更好的形式(但这是我第一个使用 Python+Gstreamer 的程序,它可以工作,所以我很好)
- 为了避免数据丢失,我在
pipeline.send_event(Gst.Event.new_eos())
后一秒调用了pipeline.set_state(Gst.State.NULL)
,但我仍然收到WARN audiosrc gstaudiosrc.c:244:audioringbuffer_thread_func:<pulsesrc> error reading data -1 (reason: Success), skipping segment
之类的消息