Python-twisted:防止 tcp reader 和网络资源之间的缓冲
Python-twisted: Preventing buffering between tcp reader and web resource
我正在尝试使用 Twisted web.Resource 实现 MJPEG 服务器
通过从本身就是上游的 gstreamer 进程读取数据来获取数据
将 MJPEG 数据写入 TCP 端口 localhost:9999。我有这样的东西
现在:
from twisted.internet import reactor, protocol, defer
from twisted.web import server, resource
class MJpegResource(resource.Resource):
def __init__(self, queues):
self.queues = queues
@defer.inlineCallbacks
def deferredRenderer(self, request):
q = defer.DeferredQueue()
self.queues.append([q, request])
while True:
yield q.get()
def render_GET(self, request):
request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto')
self.deferredRenderer(request)
return server.NOT_DONE_YET
class JpegStreamReader(protocol.Protocol):
def dataReceived(self, data):
for (q, req) in self.factory.queues:
req.write(data)
q.put('')
root = File('web')
root.putChild('stream.mjpeg', MJpegResource(queues))
factory = protocol.Factory()
factory.protocol = JpegStreamReader
factory.queues = queues
reactor.listenTCP(9999, factory)
site = server.Site(root)
reactor.listenTCP(80, site)
# spawn gstreamer process which writes to port 9999.
# The gstream process is launched using:
# gst-launch-1.0 -v \
# v4l2src device=/dev/video0 \
# ! video/x-raw,framerate=15/1, width=640, height=480 \
# ! jpegenc \
# ! multipartmux boundary=spionisto \
# ! tcpclientsink host=127.0.0.1 port=9999 \
reactor.run()
所以像这样:
gstreamer --> JpegStreamReader --> MJpegResource
这工作正常,但我发现偶尔,
浏览器远远落后于 "live"(最多 30-40 秒
有时)。一旦我刷新浏览器,MJPEG 流就会跳回
成为 "live"。所以我怀疑 JpegStreamReader 无法
以最快的速度写入对应于 web.http.Request 的 TCP 套接字
gstreamer 正在填充 TCP 套接字 9999 并且正在缓冲
在 JpegStreamReader 的输入队列上。
由于流应该是 "live",我可以将帧丢到
带回视频直播。但是,我不确定如何检测
JpegStreamReader 落后等等?关于如何做的任何建议
让这条管道更像直播?
如果从根本上说有另一种架构可以做到这一点,建议
也将不胜感激。
您可以在 Request
对象上注册生产者。当 Request
的写入缓冲区已满时,它将调用其 pauseProducing
方法。当房间可用时,它将调用 resumeProducing
方法。
您可以使用此信息来丢弃可能无法及时传送的帧。但是,您必须实际识别服务器中的帧(目前您只有 dataReceived
方法将数据作为流传递,而不知道帧的开始或结束位置)。这还有一个问题,即缓冲区满度可能是流中延迟的非常滞后的指标。如果系统中的瓶颈不在从 gstreamer 读取数据并将其写入请求之间,那么向程序的这一部分添加背压敏感性将无济于事。
这是实现 Jean-Paul Calerone 的最终解决方案
建议。请注意,现在我们有一个 JpegProducer class 实现了
PushProducer 接口。当请求暂停时,它会设置一个标志。这个
启用 TCP 流 reader (JpegStreamReader) 不将帧推入
那个特定的生产者,如果它被堵塞了。根据 Jean-Paul 的建议,我
还必须将多部分 MJPEG 流分解成块,以便我们
始终丢帧而不破坏 MJPEG 输出格式。
from twisted.internet import reactor, protocol, defer, interfaces
from twisted.web import server, resource
from zope.interface import implementer
class MJpegResource(resource.Resource):
def __init__(self, queues):
self.queues = queues
def setupProducer(self, request):
producer = JpegProducer(request)
request.notifyFinish().addErrback(self._responseFailed, producer)
request.registerProducer(producer, True)
self.queues.append(producer)
def _responseFailed(self, err, producer):
producer.stopProducing()
def render_GET(self, request):
request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto')
self.setupProducer(request)
return server.NOT_DONE_YET
@implementer(interfaces.IPushProducer)
class JpegProducer(object):
def __init__(self, request):
self.request = request
self.isPaused = False
self.isStopped = False
self.delayedCall = None
def cancelCall(self):
if self.delayedCall:
self.delayedCall.cancel()
self.delayedCall = None
def pauseProducing(self):
self.isPaused = True
self.cancelCall()
def resetPausedFlag(self):
self.isPaused = False
self.delayedCall = None
def resumeProducing(self):
# calling self.cancelCall is defensive. We should not really get
# called with multiple resumeProducing calls without any
# pauseProducing in the middle.
self.cancelCall()
self.delayedCall = reactor.callLater(1, self.resetPausedFlag)
log('producer is requesting to be resumed')
def stopProducing(self):
self.isPaused = True
self.isStopped = True
log('producer is requesting to be stopped')
MJPEG_SEP = '--spionisto\r\n'
class JpegStreamReader(protocol.Protocol):
def __init__(self):
self.tnow = None
def connectionMade(self):
self.data = ''
self.tnow = datetime.now()
def dataReceived(self, data):
self.data += data
chunks = self.data.rsplit(MJPEG_SEP, 1)
dataToSend = ''
if len(chunks) == 2:
dataToSend = chunks[0] + MJPEG_SEP
self.data = chunks[-1]
for producer in self.factory.queues:
if (not producer.isPaused):
producer.request.write(dataToSend)
我正在尝试使用 Twisted web.Resource 实现 MJPEG 服务器 通过从本身就是上游的 gstreamer 进程读取数据来获取数据 将 MJPEG 数据写入 TCP 端口 localhost:9999。我有这样的东西 现在:
from twisted.internet import reactor, protocol, defer
from twisted.web import server, resource
class MJpegResource(resource.Resource):
def __init__(self, queues):
self.queues = queues
@defer.inlineCallbacks
def deferredRenderer(self, request):
q = defer.DeferredQueue()
self.queues.append([q, request])
while True:
yield q.get()
def render_GET(self, request):
request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto')
self.deferredRenderer(request)
return server.NOT_DONE_YET
class JpegStreamReader(protocol.Protocol):
def dataReceived(self, data):
for (q, req) in self.factory.queues:
req.write(data)
q.put('')
root = File('web')
root.putChild('stream.mjpeg', MJpegResource(queues))
factory = protocol.Factory()
factory.protocol = JpegStreamReader
factory.queues = queues
reactor.listenTCP(9999, factory)
site = server.Site(root)
reactor.listenTCP(80, site)
# spawn gstreamer process which writes to port 9999.
# The gstream process is launched using:
# gst-launch-1.0 -v \
# v4l2src device=/dev/video0 \
# ! video/x-raw,framerate=15/1, width=640, height=480 \
# ! jpegenc \
# ! multipartmux boundary=spionisto \
# ! tcpclientsink host=127.0.0.1 port=9999 \
reactor.run()
所以像这样:
gstreamer --> JpegStreamReader --> MJpegResource
这工作正常,但我发现偶尔, 浏览器远远落后于 "live"(最多 30-40 秒 有时)。一旦我刷新浏览器,MJPEG 流就会跳回 成为 "live"。所以我怀疑 JpegStreamReader 无法 以最快的速度写入对应于 web.http.Request 的 TCP 套接字 gstreamer 正在填充 TCP 套接字 9999 并且正在缓冲 在 JpegStreamReader 的输入队列上。
由于流应该是 "live",我可以将帧丢到 带回视频直播。但是,我不确定如何检测 JpegStreamReader 落后等等?关于如何做的任何建议 让这条管道更像直播?
如果从根本上说有另一种架构可以做到这一点,建议 也将不胜感激。
您可以在 Request
对象上注册生产者。当 Request
的写入缓冲区已满时,它将调用其 pauseProducing
方法。当房间可用时,它将调用 resumeProducing
方法。
您可以使用此信息来丢弃可能无法及时传送的帧。但是,您必须实际识别服务器中的帧(目前您只有 dataReceived
方法将数据作为流传递,而不知道帧的开始或结束位置)。这还有一个问题,即缓冲区满度可能是流中延迟的非常滞后的指标。如果系统中的瓶颈不在从 gstreamer 读取数据并将其写入请求之间,那么向程序的这一部分添加背压敏感性将无济于事。
这是实现 Jean-Paul Calerone 的最终解决方案 建议。请注意,现在我们有一个 JpegProducer class 实现了 PushProducer 接口。当请求暂停时,它会设置一个标志。这个 启用 TCP 流 reader (JpegStreamReader) 不将帧推入 那个特定的生产者,如果它被堵塞了。根据 Jean-Paul 的建议,我 还必须将多部分 MJPEG 流分解成块,以便我们 始终丢帧而不破坏 MJPEG 输出格式。
from twisted.internet import reactor, protocol, defer, interfaces
from twisted.web import server, resource
from zope.interface import implementer
class MJpegResource(resource.Resource):
def __init__(self, queues):
self.queues = queues
def setupProducer(self, request):
producer = JpegProducer(request)
request.notifyFinish().addErrback(self._responseFailed, producer)
request.registerProducer(producer, True)
self.queues.append(producer)
def _responseFailed(self, err, producer):
producer.stopProducing()
def render_GET(self, request):
request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto')
self.setupProducer(request)
return server.NOT_DONE_YET
@implementer(interfaces.IPushProducer)
class JpegProducer(object):
def __init__(self, request):
self.request = request
self.isPaused = False
self.isStopped = False
self.delayedCall = None
def cancelCall(self):
if self.delayedCall:
self.delayedCall.cancel()
self.delayedCall = None
def pauseProducing(self):
self.isPaused = True
self.cancelCall()
def resetPausedFlag(self):
self.isPaused = False
self.delayedCall = None
def resumeProducing(self):
# calling self.cancelCall is defensive. We should not really get
# called with multiple resumeProducing calls without any
# pauseProducing in the middle.
self.cancelCall()
self.delayedCall = reactor.callLater(1, self.resetPausedFlag)
log('producer is requesting to be resumed')
def stopProducing(self):
self.isPaused = True
self.isStopped = True
log('producer is requesting to be stopped')
MJPEG_SEP = '--spionisto\r\n'
class JpegStreamReader(protocol.Protocol):
def __init__(self):
self.tnow = None
def connectionMade(self):
self.data = ''
self.tnow = datetime.now()
def dataReceived(self, data):
self.data += data
chunks = self.data.rsplit(MJPEG_SEP, 1)
dataToSend = ''
if len(chunks) == 2:
dataToSend = chunks[0] + MJPEG_SEP
self.data = chunks[-1]
for producer in self.factory.queues:
if (not producer.isPaused):
producer.request.write(dataToSend)