C# WebApi 中的实时 FLV 流式传输
Live FLV streaming in C# WebApi
目前我有一个使用 webapi 的实时流。通过直接从 ffmpeg 接收 flv 流并使用 PushStreamContent 将其直接发送到客户端。如果在流开始时网页已经打开,这将非常有效。问题是当我打开另一个页面或刷新此页面时,您将无法再查看流(流仍在正常发送到客户端)。我认为这是由于流的开头缺少某些内容,但我不确定该怎么做。任何指针将不胜感激。
客户端读流代码
public class VideosController : ApiController
{
public HttpResponseMessage Get()
{
var response = Request.CreateResponse();
response.Content = new PushStreamContent(WriteToStream, new MediaTypeHeaderValue("video/x-flv"));
return response;
}
private async Task WriteToStream( Stream arg1, HttpContent arg2, TransportContext arg3 )
{
//I think metadata needs to be written here but not sure how
Startup.AddSubscriber( arg1 );
await Task.Yield();
}
}
接收流然后发送给客户端的代码
while (true)
{
bytes = new byte[8024000];
int bytesRec = handler.Receive(bytes);
foreach (var subscriber in Startup.Subscribers.ToList())
{
var theSubscriber = subscriber;
try
{
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
}
catch
{
Startup.Subscribers.Remove(theSubscriber);
}
}
}
我不是流媒体方面的专家,但看起来你应该关闭流,然后所有数据都会被写入
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
就像WebAPI StreamContent vs PushStreamContent
中提到的那样
{
// After save we close the stream to signal that we are done writing.
xDoc.Save(stream);
stream.Close();
}
我喜欢这段代码,因为它在处理异步编程时展示了一个基本错误
while (true)
{
}
这是一个同步循环,它会尽可能快地自我循环。它每秒可以执行数千次(取决于可用的软件和硬件资源)
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
这是一个在不同线程中执行的异步命令(如果还不够清楚的话)(while 循环表示主线程执行)
现在...为了使 while 循环等待异步命令,我们使用 await...听起来不错(否则 while 循环将执行数千个次,执行无数异步命令)
但是因为(订阅者的)循环需要为所有订阅者传输流同时它被 await 关键字卡住
这就是为什么重新加载/新订阅者冻结所有内容(新连接 = 新订阅者)
结论:整个for循环应该在一个Task里面。任务需要等到服务器将流发送给所有订阅者。只有这样它才应该继续使用 ContinueWith 的 while 循环(这就是为什么它这样调用,对吧?)
所以...写入命令需要在没有 await 关键字的情况下执行
theSubscriber.WriteAsync
foreach 循环应该使用一个任务,该任务在完成后继续执行 while 循环
我从来没有用过FLV,也没仔细研究过视频格式
大多数文件格式都是结构化的,尤其是视频格式。它们包含帧(即完整或部分屏幕截图,具体取决于压缩格式)。
如果您在开始向新订阅者流式传输时设法达到特定帧,那您应该真的很幸运。因此,当他们开始接收流时,他们无法识别格式,因为帧是部分的。
您可以在 wikipedia article 中阅读更多 FLV 帧。这很可能是您的问题。
一个简单的尝试是尝试保存第一个订阅者连接时从流媒体服务器收到的初始 header。
类似于:
static byte _header = new byte[9]; //signature, version, flags, headerSize
public void YourStreamMethod()
{
int bytesRec = handler.Receive(bytes);
if (!_headerIsStored)
{
//store header
Buffer.BlockCopy(bytes, 0, _header, 0, 9);
_headerIsStored = true;
}
}
.. 这允许您将 header 发送给下一个连接用户:
private async Task WriteToStream( Stream arg1, HttpContent arg2, TransportContext arg3 )
{
// send the FLV header
arg1.Write(_header, 0, 9);
Startup.AddSubscriber( arg1 );
await Task.Yield();
}
完成后,祈祷接收器将忽略部分帧。如果不是,您需要分析流以确定下一帧的位置。
为此,您需要执行以下操作:
- 创建一个
BytesLeftToNextFrame
变量。
- 将接收到的数据包header存储在缓冲区
- 将 "Payload size" 位转换为 int
- 将
BytesLeftToNextFrame
重置为解析值
- 倒计时到下一次阅读 header。
最后,当新客户端连接时,在知道下一帧到达之前不要开始流式传输。
伪代码:
var bytesLeftToNextFrame = 0;
while (true)
{
bytes = new byte[8024000];
int bytesRec = handler.Receive(bytes);
foreach (var subscriber in Startup.Subscribers.ToList())
{
var theSubscriber = subscriber;
try
{
if (subscriber.IsNew && bytesLeftToNextFrame < bytesRec)
{
//start from the index where the new frame starts
await theSubscriber.WriteAsync( bytes, bytesLeftToNextFrame, bytesRec - bytesLeftToNextFrame);
subscriber.IsNew = false;
}
else
{
//send everything, since we've already in streaming mode
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
}
}
catch
{
Startup.Subscribers.Remove(theSubscriber);
}
}
//TODO: check if the current frame is done
// then parse the next header and reset the counter.
}
目前我有一个使用 webapi 的实时流。通过直接从 ffmpeg 接收 flv 流并使用 PushStreamContent 将其直接发送到客户端。如果在流开始时网页已经打开,这将非常有效。问题是当我打开另一个页面或刷新此页面时,您将无法再查看流(流仍在正常发送到客户端)。我认为这是由于流的开头缺少某些内容,但我不确定该怎么做。任何指针将不胜感激。
客户端读流代码
public class VideosController : ApiController
{
public HttpResponseMessage Get()
{
var response = Request.CreateResponse();
response.Content = new PushStreamContent(WriteToStream, new MediaTypeHeaderValue("video/x-flv"));
return response;
}
private async Task WriteToStream( Stream arg1, HttpContent arg2, TransportContext arg3 )
{
//I think metadata needs to be written here but not sure how
Startup.AddSubscriber( arg1 );
await Task.Yield();
}
}
接收流然后发送给客户端的代码
while (true)
{
bytes = new byte[8024000];
int bytesRec = handler.Receive(bytes);
foreach (var subscriber in Startup.Subscribers.ToList())
{
var theSubscriber = subscriber;
try
{
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
}
catch
{
Startup.Subscribers.Remove(theSubscriber);
}
}
}
我不是流媒体方面的专家,但看起来你应该关闭流,然后所有数据都会被写入
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
就像WebAPI StreamContent vs PushStreamContent
中提到的那样{
// After save we close the stream to signal that we are done writing.
xDoc.Save(stream);
stream.Close();
}
我喜欢这段代码,因为它在处理异步编程时展示了一个基本错误
while (true)
{
}
这是一个同步循环,它会尽可能快地自我循环。它每秒可以执行数千次(取决于可用的软件和硬件资源)
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
这是一个在不同线程中执行的异步命令(如果还不够清楚的话)(while 循环表示主线程执行)
现在...为了使 while 循环等待异步命令,我们使用 await...听起来不错(否则 while 循环将执行数千个次,执行无数异步命令)
但是因为(订阅者的)循环需要为所有订阅者传输流同时它被 await 关键字卡住
这就是为什么重新加载/新订阅者冻结所有内容(新连接 = 新订阅者)
结论:整个for循环应该在一个Task里面。任务需要等到服务器将流发送给所有订阅者。只有这样它才应该继续使用 ContinueWith 的 while 循环(这就是为什么它这样调用,对吧?)
所以...写入命令需要在没有 await 关键字的情况下执行
theSubscriber.WriteAsync
foreach 循环应该使用一个任务,该任务在完成后继续执行 while 循环
我从来没有用过FLV,也没仔细研究过视频格式
大多数文件格式都是结构化的,尤其是视频格式。它们包含帧(即完整或部分屏幕截图,具体取决于压缩格式)。
如果您在开始向新订阅者流式传输时设法达到特定帧,那您应该真的很幸运。因此,当他们开始接收流时,他们无法识别格式,因为帧是部分的。
您可以在 wikipedia article 中阅读更多 FLV 帧。这很可能是您的问题。
一个简单的尝试是尝试保存第一个订阅者连接时从流媒体服务器收到的初始 header。
类似于:
static byte _header = new byte[9]; //signature, version, flags, headerSize
public void YourStreamMethod()
{
int bytesRec = handler.Receive(bytes);
if (!_headerIsStored)
{
//store header
Buffer.BlockCopy(bytes, 0, _header, 0, 9);
_headerIsStored = true;
}
}
.. 这允许您将 header 发送给下一个连接用户:
private async Task WriteToStream( Stream arg1, HttpContent arg2, TransportContext arg3 )
{
// send the FLV header
arg1.Write(_header, 0, 9);
Startup.AddSubscriber( arg1 );
await Task.Yield();
}
完成后,祈祷接收器将忽略部分帧。如果不是,您需要分析流以确定下一帧的位置。
为此,您需要执行以下操作:
- 创建一个
BytesLeftToNextFrame
变量。 - 将接收到的数据包header存储在缓冲区
- 将 "Payload size" 位转换为 int
- 将
BytesLeftToNextFrame
重置为解析值 - 倒计时到下一次阅读 header。
最后,当新客户端连接时,在知道下一帧到达之前不要开始流式传输。
伪代码:
var bytesLeftToNextFrame = 0;
while (true)
{
bytes = new byte[8024000];
int bytesRec = handler.Receive(bytes);
foreach (var subscriber in Startup.Subscribers.ToList())
{
var theSubscriber = subscriber;
try
{
if (subscriber.IsNew && bytesLeftToNextFrame < bytesRec)
{
//start from the index where the new frame starts
await theSubscriber.WriteAsync( bytes, bytesLeftToNextFrame, bytesRec - bytesLeftToNextFrame);
subscriber.IsNew = false;
}
else
{
//send everything, since we've already in streaming mode
await theSubscriber.WriteAsync( bytes, 0, bytesRec );
}
}
catch
{
Startup.Subscribers.Remove(theSubscriber);
}
}
//TODO: check if the current frame is done
// then parse the next header and reset the counter.
}