多个RTSP接收方法
Multiple RTSPs receive method
我正在尝试编写一个项目,我至少要同时访问 20 个 rtsp CCTV URL。
我尝试使用 ffmpeg 通过多种输入法达到我的目标。但是,有一个问题。
ffmpeg -i URL_1 -i URL_2 -
上面的命令是我试过的例子。我希望我可以通过 ffmpeg 访问两个 rtsps 并将它们输出到两个不同的队列中以供将来使用。如果我使用此命令并在之后读取字节,我无法区分哪些字节属于哪个输入 rtsp。
有没有其他方法可以同时访问更多的rtsp?
编辑: 添加代码
import ffmpeg
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
def __init__(self, q, in_stream, name):
super().__init__()
self.name = name
self.q = q
self.command = ["ffmpeg",
"-c:v", "h264", # Tell ffmpeg that input stream codec is h264
"-i", in_stream, # Read stream from file vid.264
"-c:v", "copy", # Tell ffmpeg to copy the video stream as is (without decding and encoding)
"-an", "-sn", # No audio an no subtites
"-f", "h264", # Define pipe format to be h264
"-"] # Output is a pipe
def run(self):
pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3) # Don't use shell=True (you don't need to execute the command through the shell).
# while True:
for i in range(1024*10): # Read up to 100KBytes for testing
data = pipe.stdout.read(1024) # Read data from pip in chunks of 1024 bytes
self.q.put(data)
# Break loop if less than 1024 bytes read (not going to work with CCTV, but works with input file)
if len(data) < 1024:
break
try:
pipe.wait(timeout=1) # Wait for subprocess to finish (with timeout of 1 second).
except sp.TimeoutExpired:
pipe.kill() # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
if self.q.empty():
print("There is a problem (queue is empty)!!!")
else:
# Write data from queue to file vid_from_queue.264 (for testingg)
with open(self.name+".h264", "wb") as queue_save_file:
while not self.q.empty():
queue_save_file.write(self.q.get())
# Build synthetic video, for testing begins:
################################################
# width, height = 1280, 720
# in_stream = "vid.264"
# sp.Popen("ffmpeg -y -f lavfi -i testsrc=size=1280x720:duration=5:rate=1 -c:v libx264 -crf 23 -pix_fmt yuv420p " + in_stream).wait()
################################################
#Use public RTSP Streaming for testing
readers = {}
queues = {}
dict = {
"name1":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name2":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name3":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name4":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name5":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name6":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name7":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name8":{"ip":"rtsp://xxx.xxx.xxx.xxx/",
"name9":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name10":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name11":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name12":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name13":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name14":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name15":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
}
for key in dict:
ip = dict[key]["ip"]
name = key
q = queue.Queue()
queues[name] = q
cctv_reader = CCTVReader(q, ip, name)
readers[name] = cctv_reader
cctv_reader.start()
cctv_reader.join()
您的 中已经拥有所有基础设施。
您需要做的就是为您的 'CCTVReader' class 创建多个对象。
这是一个工作代码示例,读取两个流:
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
def __init__(self, q, in_stream, chunk_size):
super().__init__()
self.q = q
self.chunk_size = chunk_size
self.command = ["ffmpeg",
"-c:v", "h264", # Tell FFmpeg that input stream codec is h264
"-i", in_stream, # Read stream from file vid.264
"-c:v", "copy", # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
"-an", "-sn", # No audio an no subtitles
"-f", "h264", # Define pipe format to be h264
"-"] # Output is a pipe
def run(self):
pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3) # Don't use shell=True (you don't need to execute the command through the shell).
# while True:
for i in range(100): # Read up to 100KBytes for testing
data = pipe.stdout.read(self.chunk_size) # Read data from pip in chunks of self.chunk_size bytes
self.q.put(data)
# Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
if len(data) < self.chunk_size:
break
try:
pipe.wait(timeout=1) # Wait for subprocess to finish (with timeout of 1 second).
except sp.TimeoutExpired:
pipe.kill() # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",
#Use public RTSP Streaming for testing
in_stream1 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
in_stream2 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
q1 = queue.Queue()
q2 = queue.Queue()
cctv_reader1 = CCTVReader(q1, in_stream1, 1024) # First stream
cctv_reader2 = CCTVReader(q2, in_stream2, 2048) # Second stream
cctv_reader1.start()
time.sleep(5) # Wait 5 seconds (for testing).
cctv_reader2.start()
cctv_reader1.join()
cctv_reader2.join()
if q1.empty():
print("There is a problem (q1 is empty)!!!")
else:
# Write data from queue to file vid_from_queue1.264 (for testing)
with open("vid_from_q1.264", "wb") as queue_save_file:
while not q1.empty():
queue_save_file.write(q1.get())
if q2.empty():
print("There is a problem (q2 is empty)!!!")
else:
# Write data from queue to file vid_from_queue2.264 (for testing)
with open("vid_from_q2.264", "wb") as queue_save_file:
while not q2.empty():
queue_save_file.write(q2.get())
更新:
我认为您不能使用 ffmpeg -i URL_1 -i URL_2 -
...
这样的语法
您发布的代码有几个问题:
cctv_reader.join()
必须在第二个循环中,因为它等待线程结束,并阻止执行。
- 保存数据到文件应该在所有线程结束之后(只是为了测试)。
如果您想记录数据,请尝试在抓取数据后立即保存每个块。
- 减小
bufsize=1024**3
、try bufsize=1024**2*100
的大小。
如果 OS 实际上为每个进程分配 1GB 的缓冲区大小,您可能会出现内存不足的情况。
注意:Python多线程性能不太好,检查CPU负载。
这是一个代码示例(从文件中读取):
import numpy as np
import subprocess as sp
import threading
import queue
class CCTVReader(threading.Thread):
def __init__(self, q, in_stream, chunk_size):
super().__init__()
self.q = q
self.chunk_size = chunk_size
self.command = ["ffmpeg",
"-c:v", "h264", # Tell FFmpeg that input stream codec is h264
"-i", in_stream, # Read stream from file vid.264
"-c:v", "copy", # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
"-an", "-sn", # No audio an no subtitles
"-f", "h264", # Define pipe format to be h264
"-"] # Output is a pipe
def run(self):
pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**2*100)
# while True:
for i in range(100): # Read up to 100KBytes for testing
data = pipe.stdout.read(self.chunk_size) # Read data from pip in chunks of self.chunk_size bytes
self.q.put(data)
# Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
if len(data) < self.chunk_size:
break
try:
pipe.wait(timeout=1) # Wait for subprocess to finish (with timeout of 1 second).
except sp.TimeoutExpired:
pipe.kill() # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
def save_q_to_file(self, vid_file_name):
# Write data from queue to file (for testing)
if self.q.empty():
print("There is a problem (q is empty)!!!")
else:
with open(vid_file_name, "wb") as queue_save_file:
while not self.q.empty():
queue_save_file.write(self.q.get())
#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",
#Use public RTSP Streaming for testing
#in_stream = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
#Use public RTSP Streaming for testing
readers = {}
queues = {}
# Read from file (for tesing)
dict = {
"name1":{ "ip":"vid1.264", "fname":"vid_from_q1.264"},
"name2":{ "ip":"vid2.264", "fname":"vid_from_q2.264"},
"name3":{ "ip":"vid3.264", "fname":"vid_from_q3.264"},
"name4":{ "ip":"vid4.264", "fname":"vid_from_q4.264"},
"name5":{ "ip":"vid5.264", "fname":"vid_from_q5.264"},
"name6":{ "ip":"vid6.264", "fname":"vid_from_q6.264"},
"name7":{ "ip":"vid7.264", "fname":"vid_from_q7.264"},
"name8":{ "ip":"vid8.264", "fname":"vid_from_q8.264"},
"name9":{ "ip":"vid9.264", "fname":"vid_from_q9.264"},
"name10":{"ip":"vid10.264", "fname":"vid_from_q10.264"},
"name11":{"ip":"vid11.264", "fname":"vid_from_q11.264"},
"name12":{"ip":"vid12.264", "fname":"vid_from_q12.264"},
"name13":{"ip":"vid13.264", "fname":"vid_from_q13.264"},
"name14":{"ip":"vid14.264", "fname":"vid_from_q14.264"},
"name15":{"ip":"vid15.264", "fname":"vid_from_q15.264"}
}
for key in dict:
ip = dict[key]["ip"]
name = key
q = queue.Queue()
queues[name] = q
cctv_reader = CCTVReader(q, ip, 8192)
readers[name] = cctv_reader
cctv_reader.start()
# Wait for all threads to end
for key in readers:
readers[key].join()
# Save data for testing
for key in readers:
fine_name = dict[key]["fname"]
readers[key].save_q_to_file(fine_name)
我正在尝试编写一个项目,我至少要同时访问 20 个 rtsp CCTV URL。
我尝试使用 ffmpeg 通过多种输入法达到我的目标。但是,有一个问题。
ffmpeg -i URL_1 -i URL_2 -
上面的命令是我试过的例子。我希望我可以通过 ffmpeg 访问两个 rtsps 并将它们输出到两个不同的队列中以供将来使用。如果我使用此命令并在之后读取字节,我无法区分哪些字节属于哪个输入 rtsp。
有没有其他方法可以同时访问更多的rtsp?
编辑: 添加代码
import ffmpeg
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
def __init__(self, q, in_stream, name):
super().__init__()
self.name = name
self.q = q
self.command = ["ffmpeg",
"-c:v", "h264", # Tell ffmpeg that input stream codec is h264
"-i", in_stream, # Read stream from file vid.264
"-c:v", "copy", # Tell ffmpeg to copy the video stream as is (without decding and encoding)
"-an", "-sn", # No audio an no subtites
"-f", "h264", # Define pipe format to be h264
"-"] # Output is a pipe
def run(self):
pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3) # Don't use shell=True (you don't need to execute the command through the shell).
# while True:
for i in range(1024*10): # Read up to 100KBytes for testing
data = pipe.stdout.read(1024) # Read data from pip in chunks of 1024 bytes
self.q.put(data)
# Break loop if less than 1024 bytes read (not going to work with CCTV, but works with input file)
if len(data) < 1024:
break
try:
pipe.wait(timeout=1) # Wait for subprocess to finish (with timeout of 1 second).
except sp.TimeoutExpired:
pipe.kill() # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
if self.q.empty():
print("There is a problem (queue is empty)!!!")
else:
# Write data from queue to file vid_from_queue.264 (for testingg)
with open(self.name+".h264", "wb") as queue_save_file:
while not self.q.empty():
queue_save_file.write(self.q.get())
# Build synthetic video, for testing begins:
################################################
# width, height = 1280, 720
# in_stream = "vid.264"
# sp.Popen("ffmpeg -y -f lavfi -i testsrc=size=1280x720:duration=5:rate=1 -c:v libx264 -crf 23 -pix_fmt yuv420p " + in_stream).wait()
################################################
#Use public RTSP Streaming for testing
readers = {}
queues = {}
dict = {
"name1":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name2":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name3":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name4":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name5":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name6":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name7":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name8":{"ip":"rtsp://xxx.xxx.xxx.xxx/",
"name9":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name10":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name11":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name12":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name13":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name14":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
"name15":{"ip":"rtsp://xxx.xxx.xxx.xxx/"},
}
for key in dict:
ip = dict[key]["ip"]
name = key
q = queue.Queue()
queues[name] = q
cctv_reader = CCTVReader(q, ip, name)
readers[name] = cctv_reader
cctv_reader.start()
cctv_reader.join()
您的
您需要做的就是为您的 'CCTVReader' class 创建多个对象。
这是一个工作代码示例,读取两个流:
import numpy as np
import subprocess as sp
import threading
import queue
import time
class CCTVReader(threading.Thread):
def __init__(self, q, in_stream, chunk_size):
super().__init__()
self.q = q
self.chunk_size = chunk_size
self.command = ["ffmpeg",
"-c:v", "h264", # Tell FFmpeg that input stream codec is h264
"-i", in_stream, # Read stream from file vid.264
"-c:v", "copy", # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
"-an", "-sn", # No audio an no subtitles
"-f", "h264", # Define pipe format to be h264
"-"] # Output is a pipe
def run(self):
pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**3) # Don't use shell=True (you don't need to execute the command through the shell).
# while True:
for i in range(100): # Read up to 100KBytes for testing
data = pipe.stdout.read(self.chunk_size) # Read data from pip in chunks of self.chunk_size bytes
self.q.put(data)
# Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
if len(data) < self.chunk_size:
break
try:
pipe.wait(timeout=1) # Wait for subprocess to finish (with timeout of 1 second).
except sp.TimeoutExpired:
pipe.kill() # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",
#Use public RTSP Streaming for testing
in_stream1 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
in_stream2 = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
q1 = queue.Queue()
q2 = queue.Queue()
cctv_reader1 = CCTVReader(q1, in_stream1, 1024) # First stream
cctv_reader2 = CCTVReader(q2, in_stream2, 2048) # Second stream
cctv_reader1.start()
time.sleep(5) # Wait 5 seconds (for testing).
cctv_reader2.start()
cctv_reader1.join()
cctv_reader2.join()
if q1.empty():
print("There is a problem (q1 is empty)!!!")
else:
# Write data from queue to file vid_from_queue1.264 (for testing)
with open("vid_from_q1.264", "wb") as queue_save_file:
while not q1.empty():
queue_save_file.write(q1.get())
if q2.empty():
print("There is a problem (q2 is empty)!!!")
else:
# Write data from queue to file vid_from_queue2.264 (for testing)
with open("vid_from_q2.264", "wb") as queue_save_file:
while not q2.empty():
queue_save_file.write(q2.get())
更新:
我认为您不能使用 ffmpeg -i URL_1 -i URL_2 -
...
您发布的代码有几个问题:
cctv_reader.join()
必须在第二个循环中,因为它等待线程结束,并阻止执行。- 保存数据到文件应该在所有线程结束之后(只是为了测试)。
如果您想记录数据,请尝试在抓取数据后立即保存每个块。 - 减小
bufsize=1024**3
、try bufsize=1024**2*100
的大小。
如果 OS 实际上为每个进程分配 1GB 的缓冲区大小,您可能会出现内存不足的情况。
注意:Python多线程性能不太好,检查CPU负载。
这是一个代码示例(从文件中读取):
import numpy as np
import subprocess as sp
import threading
import queue
class CCTVReader(threading.Thread):
def __init__(self, q, in_stream, chunk_size):
super().__init__()
self.q = q
self.chunk_size = chunk_size
self.command = ["ffmpeg",
"-c:v", "h264", # Tell FFmpeg that input stream codec is h264
"-i", in_stream, # Read stream from file vid.264
"-c:v", "copy", # Tell FFmpeg to copy the video stream as is (without decoding and encoding)
"-an", "-sn", # No audio an no subtitles
"-f", "h264", # Define pipe format to be h264
"-"] # Output is a pipe
def run(self):
pipe = sp.Popen(self.command, stdout=sp.PIPE, bufsize=1024**2*100)
# while True:
for i in range(100): # Read up to 100KBytes for testing
data = pipe.stdout.read(self.chunk_size) # Read data from pip in chunks of self.chunk_size bytes
self.q.put(data)
# Break loop if less than self.chunk_size bytes read (not going to work with CCTV, but works with input file)
if len(data) < self.chunk_size:
break
try:
pipe.wait(timeout=1) # Wait for subprocess to finish (with timeout of 1 second).
except sp.TimeoutExpired:
pipe.kill() # Kill subprocess in case of a timeout (there should be a timeout because input stream still lives).
def save_q_to_file(self, vid_file_name):
# Write data from queue to file (for testing)
if self.q.empty():
print("There is a problem (q is empty)!!!")
else:
with open(vid_file_name, "wb") as queue_save_file:
while not self.q.empty():
queue_save_file.write(self.q.get())
#in_stream = "rtsp://xxx.xxx.xxx.xxx:xxx/Streaming/Channels/101?transportmode=multicast",
#Use public RTSP Streaming for testing
#in_stream = "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov"
#Use public RTSP Streaming for testing
readers = {}
queues = {}
# Read from file (for tesing)
dict = {
"name1":{ "ip":"vid1.264", "fname":"vid_from_q1.264"},
"name2":{ "ip":"vid2.264", "fname":"vid_from_q2.264"},
"name3":{ "ip":"vid3.264", "fname":"vid_from_q3.264"},
"name4":{ "ip":"vid4.264", "fname":"vid_from_q4.264"},
"name5":{ "ip":"vid5.264", "fname":"vid_from_q5.264"},
"name6":{ "ip":"vid6.264", "fname":"vid_from_q6.264"},
"name7":{ "ip":"vid7.264", "fname":"vid_from_q7.264"},
"name8":{ "ip":"vid8.264", "fname":"vid_from_q8.264"},
"name9":{ "ip":"vid9.264", "fname":"vid_from_q9.264"},
"name10":{"ip":"vid10.264", "fname":"vid_from_q10.264"},
"name11":{"ip":"vid11.264", "fname":"vid_from_q11.264"},
"name12":{"ip":"vid12.264", "fname":"vid_from_q12.264"},
"name13":{"ip":"vid13.264", "fname":"vid_from_q13.264"},
"name14":{"ip":"vid14.264", "fname":"vid_from_q14.264"},
"name15":{"ip":"vid15.264", "fname":"vid_from_q15.264"}
}
for key in dict:
ip = dict[key]["ip"]
name = key
q = queue.Queue()
queues[name] = q
cctv_reader = CCTVReader(q, ip, 8192)
readers[name] = cctv_reader
cctv_reader.start()
# Wait for all threads to end
for key in readers:
readers[key].join()
# Save data for testing
for key in readers:
fine_name = dict[key]["fname"]
readers[key].save_q_to_file(fine_name)