在 linux/bash 中使用非阻塞 FIFO 流式传输视频
Streaming video using a non-blocking FIFO in linux/bash
我正在努力实现以下目标:
- 将来自我的 Raspberry Pi 相机的视频写入磁盘,不受流式传输的任何干扰
- 通过优化延迟的网络流式传输同一视频
重要的是流媒体不会干扰正在写入磁盘的视频,因为网络连接可能不稳定,例如 WiFi 路由器可能超出范围等。
为了做到这一点,我尝试的第一件事是:
#Receiver side
FPS="30"
netcat -l -p 5000 | mplayer -vf scale -zoom -xy 1280 -fps $FPS -cache-min 50 -cache 1024 - &
#RPi side
FPS="30"
mkfifo netcat_fifo
raspivid -t 0 -md 5 -fps $FPS -o - | tee --output-error=warn netcat_fifo > $video_out &
cat netcat_fifo | netcat -v 192.168.0.101 5000 &> $netcat_log &
而且流式传输效果很好。但是,当我关闭路由器时,模拟网络问题,我的 $video_out 被切断了。我认为这是由于 netcat_fifo.
的背压造成的
我在 stackexchange 上找到了一个关于非阻塞 FIFO 的解决方案,方法是用 ftee 替换 tee:
Linux non-blocking fifo (on demand logging)
现在可以防止我的$video_out受到串流的影响,但是串流本身很不稳定。最好的结果是使用以下脚本:
#RPi side
FPS="30"
MULTIPIPE="ftee"
mkfifo netcat_fifo
raspivid -t 0 -md 5 -fps $FPS -o - | ./${MULTIPIPE} netcat_fifo > $video_out &
cat netcat_fifo | mbuffer --direct -t -s 2k 2> $mbuffer_log | netcat -v 192.168.0.101 5000 &> $netcat_log &
当我检查 mbuffer 日志时,我诊断出一个 FIFO 大部分时间都是空的,但有 99-100% 的利用率峰值。在这些峰值期间,我的 mplayer 接收端在解码视频时出现许多错误,需要大约 5 秒才能自行恢复。在此间隔之后,mbuffer 日志再次显示一个空的 FIFO。
empty->full->empty 继续下去。
我有两个问题:
- 我是否使用了正确的方法来解决我的问题?
- 如果是这样,我如何在保持 $video_out 文件完好无损的同时使我的流式传输更强大?
我对此进行了一些尝试,它似乎在我的 Raspberry Pi 3 上工作得非常稳固。评论很好,所以应该很容易理解,但你可以随时询问是否有任何问题。
基本上有3个线程:
主程序 - 它不断地从raspivid
读取它的stdin
并循环地将数据放入一堆缓冲区
磁盘写入器线程 - 它不断循环缓冲区列表,等待下一个缓冲区变满。当缓冲区已满时,它将内容写入磁盘,将缓冲区标记为已写入并移动到下一个
fifo writer thread - 它不断循环缓冲区列表,等待下一个缓冲区变满。当缓冲区已满时,它将内容写入 fifo,刷新 fifo 以减少滞后并将缓冲区标记为已写入并移至下一个缓冲区。忽略错误。
所以,这是代码:
////////////////////////////////////////////////////////////////////////////////
// main.cpp
// Mark Setchell
//
// Read video stream from "raspivid" and write (independently) to both disk file
// and stdout - for onward netcatting to another host.
//
// Compiles with:
// g++ main.cpp -o main -lpthread
//
// Run on Raspberry Pi with:
// raspivid -t 0 -md 5 -fps 30 -o - | ./main video.h264 | netcat -v 192.168.0.8 5000
//
// Receive on other host with:
// netcat -l -p 5000 | mplayer -vf scale -zoom -xy 1280 -fps 30 -cache-min 50 -cache 1024 -
////////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <chrono>
#include <thread>
#include <vector>
#include <unistd.h>
#include <atomic>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define BUFSZ 65536
#define NBUFS 64
class Buffer{
public:
int bytes=0;
std::atomic<int> NeedsWriteToDisk{0};
std::atomic<int> NeedsWriteToFifo{0};
unsigned char data[BUFSZ];
};
std::vector<Buffer> buffers(NBUFS);
////////////////////////////////////////////////////////////////////////////////
// This is the DiskWriter thread.
// It loops through all the buffers waiting in turn for each one to become ready
// then writes it to disk and marks the buffer as written before moving to next
// buffer.
////////////////////////////////////////////////////////////////////////////////
void DiskWriter(char* filename){
int bufIndex=0;
// Open output file
int fd=open(filename,O_CREAT|O_WRONLY|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if(fd==-1)
{
std::cerr << "ERROR: Unable to open output file" << std::endl;
exit(EXIT_FAILURE);
}
bool Error=false;
while(!Error){
// Wait for buffer to be filled by main thread
while(buffers[bufIndex].NeedsWriteToDisk!=1){
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Write to disk
int bytesToWrite=buffers[bufIndex].bytes;
int bytesWritten=write(fd,reinterpret_cast<unsigned char*>(&buffers[bufIndex].data),bytesToWrite);
if(bytesWritten!=bytesToWrite){
std::cerr << "ERROR: Unable to write to disk" << std::endl;
exit(EXIT_FAILURE);
}
// Mark buffer as written
buffers[bufIndex].NeedsWriteToDisk=0;
// Move to next buffer
bufIndex=(bufIndex+1)%NBUFS;
}
}
////////////////////////////////////////////////////////////////////////////////
// This is the FifoWriter thread.
// It loops through all the buffers waiting in turn for each one to become ready
// then writes it to the Fifo, flushes it for reduced lag, and marks the buffer
// as written before moving to next one. Errors are ignored.
////////////////////////////////////////////////////////////////////////////////
void FifoWriter(){
int bufIndex=0;
bool Error=false;
while(!Error){
// Wait for buffer to be filled by main thread
while(buffers[bufIndex].NeedsWriteToFifo!=1){
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Write to fifo
int bytesToWrite=buffers[bufIndex].bytes;
int bytesWritten=write(STDOUT_FILENO,reinterpret_cast<unsigned char*>(&buffers[bufIndex].data),bytesToWrite);
if(bytesWritten!=bytesToWrite){
std::cerr << "ERROR: Unable to write to fifo" << std::endl;
}
// Try to reduce lag
fflush(stdout);
// Mark buffer as written
buffers[bufIndex].NeedsWriteToFifo=0;
// Move to next buffer
bufIndex=(bufIndex+1)%NBUFS;
}
}
int main(int argc, char *argv[])
{
int bufIndex=0;
if(argc!=2){
std::cerr << "ERROR: Usage " << argv[0] << " filename" << std::endl;
exit(EXIT_FAILURE);
}
char * filename = argv[1];
// Start disk and fifo writing threads in parallel
std::thread tDiskWriter(DiskWriter,filename);
std::thread tFifoWriter(FifoWriter);
bool Error=false;
// Continuously fill buffers from "raspivid" on stdin. Mark as full and
// needing output to disk and fifo before moving to next buffer.
while(!Error)
{
// Check disk writer is not behind before re-using buffer
if(buffers[bufIndex].NeedsWriteToDisk==1){
std::cerr << "ERROR: Disk writer is behind by " << NBUFS << " buffers" << std::endl;
}
// Check fifo writer is not behind before re-using buffer
if(buffers[bufIndex].NeedsWriteToFifo==1){
std::cerr << "ERROR: Fifo writer is behind by " << NBUFS << " buffers" << std::endl;
}
// Read from STDIN till buffer is pretty full
int bytes;
int totalBytes=0;
int bytesToRead=BUFSZ;
unsigned char* ptr=reinterpret_cast<unsigned char*>(&buffers[bufIndex].data);
while(totalBytes<(BUFSZ*.75)){
bytes = read(STDIN_FILENO,ptr,bytesToRead);
if(bytes<=0){
Error=true;
break;
}
ptr+=bytes;
totalBytes+=bytes;
bytesToRead-=bytes;
}
// Signal buffer ready for writing
buffers[bufIndex].bytes=totalBytes;
buffers[bufIndex].NeedsWriteToDisk=1;
buffers[bufIndex].NeedsWriteToFifo=1;
// Move to next buffer
bufIndex=(bufIndex+1)%NBUFS;
}
}
我正在努力实现以下目标:
- 将来自我的 Raspberry Pi 相机的视频写入磁盘,不受流式传输的任何干扰
- 通过优化延迟的网络流式传输同一视频
重要的是流媒体不会干扰正在写入磁盘的视频,因为网络连接可能不稳定,例如 WiFi 路由器可能超出范围等。
为了做到这一点,我尝试的第一件事是:
#Receiver side
FPS="30"
netcat -l -p 5000 | mplayer -vf scale -zoom -xy 1280 -fps $FPS -cache-min 50 -cache 1024 - &
#RPi side
FPS="30"
mkfifo netcat_fifo
raspivid -t 0 -md 5 -fps $FPS -o - | tee --output-error=warn netcat_fifo > $video_out &
cat netcat_fifo | netcat -v 192.168.0.101 5000 &> $netcat_log &
而且流式传输效果很好。但是,当我关闭路由器时,模拟网络问题,我的 $video_out 被切断了。我认为这是由于 netcat_fifo.
的背压造成的我在 stackexchange 上找到了一个关于非阻塞 FIFO 的解决方案,方法是用 ftee 替换 tee:
Linux non-blocking fifo (on demand logging)
现在可以防止我的$video_out受到串流的影响,但是串流本身很不稳定。最好的结果是使用以下脚本:
#RPi side
FPS="30"
MULTIPIPE="ftee"
mkfifo netcat_fifo
raspivid -t 0 -md 5 -fps $FPS -o - | ./${MULTIPIPE} netcat_fifo > $video_out &
cat netcat_fifo | mbuffer --direct -t -s 2k 2> $mbuffer_log | netcat -v 192.168.0.101 5000 &> $netcat_log &
当我检查 mbuffer 日志时,我诊断出一个 FIFO 大部分时间都是空的,但有 99-100% 的利用率峰值。在这些峰值期间,我的 mplayer 接收端在解码视频时出现许多错误,需要大约 5 秒才能自行恢复。在此间隔之后,mbuffer 日志再次显示一个空的 FIFO。 empty->full->empty 继续下去。
我有两个问题:
- 我是否使用了正确的方法来解决我的问题?
- 如果是这样,我如何在保持 $video_out 文件完好无损的同时使我的流式传输更强大?
我对此进行了一些尝试,它似乎在我的 Raspberry Pi 3 上工作得非常稳固。评论很好,所以应该很容易理解,但你可以随时询问是否有任何问题。
基本上有3个线程:
主程序 - 它不断地从
raspivid
读取它的stdin
并循环地将数据放入一堆缓冲区磁盘写入器线程 - 它不断循环缓冲区列表,等待下一个缓冲区变满。当缓冲区已满时,它将内容写入磁盘,将缓冲区标记为已写入并移动到下一个
fifo writer thread - 它不断循环缓冲区列表,等待下一个缓冲区变满。当缓冲区已满时,它将内容写入 fifo,刷新 fifo 以减少滞后并将缓冲区标记为已写入并移至下一个缓冲区。忽略错误。
所以,这是代码:
////////////////////////////////////////////////////////////////////////////////
// main.cpp
// Mark Setchell
//
// Read video stream from "raspivid" and write (independently) to both disk file
// and stdout - for onward netcatting to another host.
//
// Compiles with:
// g++ main.cpp -o main -lpthread
//
// Run on Raspberry Pi with:
// raspivid -t 0 -md 5 -fps 30 -o - | ./main video.h264 | netcat -v 192.168.0.8 5000
//
// Receive on other host with:
// netcat -l -p 5000 | mplayer -vf scale -zoom -xy 1280 -fps 30 -cache-min 50 -cache 1024 -
////////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <chrono>
#include <thread>
#include <vector>
#include <unistd.h>
#include <atomic>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define BUFSZ 65536
#define NBUFS 64
class Buffer{
public:
int bytes=0;
std::atomic<int> NeedsWriteToDisk{0};
std::atomic<int> NeedsWriteToFifo{0};
unsigned char data[BUFSZ];
};
std::vector<Buffer> buffers(NBUFS);
////////////////////////////////////////////////////////////////////////////////
// This is the DiskWriter thread.
// It loops through all the buffers waiting in turn for each one to become ready
// then writes it to disk and marks the buffer as written before moving to next
// buffer.
////////////////////////////////////////////////////////////////////////////////
void DiskWriter(char* filename){
int bufIndex=0;
// Open output file
int fd=open(filename,O_CREAT|O_WRONLY|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if(fd==-1)
{
std::cerr << "ERROR: Unable to open output file" << std::endl;
exit(EXIT_FAILURE);
}
bool Error=false;
while(!Error){
// Wait for buffer to be filled by main thread
while(buffers[bufIndex].NeedsWriteToDisk!=1){
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Write to disk
int bytesToWrite=buffers[bufIndex].bytes;
int bytesWritten=write(fd,reinterpret_cast<unsigned char*>(&buffers[bufIndex].data),bytesToWrite);
if(bytesWritten!=bytesToWrite){
std::cerr << "ERROR: Unable to write to disk" << std::endl;
exit(EXIT_FAILURE);
}
// Mark buffer as written
buffers[bufIndex].NeedsWriteToDisk=0;
// Move to next buffer
bufIndex=(bufIndex+1)%NBUFS;
}
}
////////////////////////////////////////////////////////////////////////////////
// This is the FifoWriter thread.
// It loops through all the buffers waiting in turn for each one to become ready
// then writes it to the Fifo, flushes it for reduced lag, and marks the buffer
// as written before moving to next one. Errors are ignored.
////////////////////////////////////////////////////////////////////////////////
void FifoWriter(){
int bufIndex=0;
bool Error=false;
while(!Error){
// Wait for buffer to be filled by main thread
while(buffers[bufIndex].NeedsWriteToFifo!=1){
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Write to fifo
int bytesToWrite=buffers[bufIndex].bytes;
int bytesWritten=write(STDOUT_FILENO,reinterpret_cast<unsigned char*>(&buffers[bufIndex].data),bytesToWrite);
if(bytesWritten!=bytesToWrite){
std::cerr << "ERROR: Unable to write to fifo" << std::endl;
}
// Try to reduce lag
fflush(stdout);
// Mark buffer as written
buffers[bufIndex].NeedsWriteToFifo=0;
// Move to next buffer
bufIndex=(bufIndex+1)%NBUFS;
}
}
int main(int argc, char *argv[])
{
int bufIndex=0;
if(argc!=2){
std::cerr << "ERROR: Usage " << argv[0] << " filename" << std::endl;
exit(EXIT_FAILURE);
}
char * filename = argv[1];
// Start disk and fifo writing threads in parallel
std::thread tDiskWriter(DiskWriter,filename);
std::thread tFifoWriter(FifoWriter);
bool Error=false;
// Continuously fill buffers from "raspivid" on stdin. Mark as full and
// needing output to disk and fifo before moving to next buffer.
while(!Error)
{
// Check disk writer is not behind before re-using buffer
if(buffers[bufIndex].NeedsWriteToDisk==1){
std::cerr << "ERROR: Disk writer is behind by " << NBUFS << " buffers" << std::endl;
}
// Check fifo writer is not behind before re-using buffer
if(buffers[bufIndex].NeedsWriteToFifo==1){
std::cerr << "ERROR: Fifo writer is behind by " << NBUFS << " buffers" << std::endl;
}
// Read from STDIN till buffer is pretty full
int bytes;
int totalBytes=0;
int bytesToRead=BUFSZ;
unsigned char* ptr=reinterpret_cast<unsigned char*>(&buffers[bufIndex].data);
while(totalBytes<(BUFSZ*.75)){
bytes = read(STDIN_FILENO,ptr,bytesToRead);
if(bytes<=0){
Error=true;
break;
}
ptr+=bytes;
totalBytes+=bytes;
bytesToRead-=bytes;
}
// Signal buffer ready for writing
buffers[bufIndex].bytes=totalBytes;
buffers[bufIndex].NeedsWriteToDisk=1;
buffers[bufIndex].NeedsWriteToFifo=1;
// Move to next buffer
bufIndex=(bufIndex+1)%NBUFS;
}
}