如何确保多个Async下载的数据按照开始的顺序保存?
How to make sure that the data of multiple Async downloads are saved in the order they were started?
我正在编写一个基本的 Http Live Stream (HLS) 下载器,我在其中以“#EXT-X-TARGETDURATION”指定的时间间隔重新下载 m3u8 媒体播放列表,然后下载 *.ts细分可用时。
这是 m3u8 媒体播放列表首次下载时的样子。
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:12
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:7.975,
http://website.com/segment_1.ts
#EXTINF:7.941,
http://website.com/segment_2.ts
#EXTINF:7.975,
http://website.com/segment_3.ts
我想使用 HttpClient async/await 同时下载这些 *.ts 片段。这些段的大小不同,因此即使先开始下载 "segment_1.ts",它也可能在其他两个段之后完成。
这些片段都是一个大视频的一部分,因此请务必按照开始顺序而不是结束顺序写入下载片段的数据。
如果分段一个接一个地下载,我下面的代码工作得很好,但当同时下载多个分段时就不行了,因为有时它们不会按照开始的顺序完成。
我考虑过使用 Task.WhenAll,它可以保证正确的顺序,但我不想将下载的段不必要地保留在内存中,因为它们的大小可能有几兆字节。如果 "segment_1.ts" 的下载确实先完成,那么应该立即将其写入磁盘,而不必等待其他段完成。将所有 *.ts 段写入单独的文件并在最后加入它们也不是一种选择,因为它需要双磁盘 space 并且整个视频的大小可能有几千兆字节。
我不知道该怎么做,我想知道是否有人可以帮助我。我正在寻找一种不需要我手动创建线程或长时间阻塞 ThreadPool 线程的方法。
一些代码和异常处理已被删除,以便更容易查看正在发生的事情。
// Async BlockingCollection from the AsyncEx library
private AsyncCollection<byte[]> segmentDataQueue = new AsyncCollection<byte[]>();
public void Start()
{
RunConsumer();
RunProducer();
}
private async void RunProducer()
{
while (!_isCancelled)
{
var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
if (!lines.Any() || lines[0] != "#EXTM3U")
throw new Exception("Invalid m3u8 media playlist.");
for (var i = 1; i < lines.Length; i++)
{
var line = lines[i];
if (line.StartsWith("#EXT-X-TARGETDURATION"))
{
ParseTargetDuration(line);
}
else if (line.StartsWith("#EXT-X-MEDIA-SEQUENCE"))
{
ParseMediaSequence(line);
}
else if (!line.StartsWith("#"))
{
if (_isNewSegment)
{
// Fire and forget
DownloadTsSegment(line);
}
}
}
// Wait until it's time to reload the m3u8 media playlist again
await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
}
}
// async void. We never await this method, so we can download multiple segments at once
private async void DownloadTsSegment(string tsUrl)
{
var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
var data = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
// Add the downloaded segment data to the AsyncCollection
await segmentDataQueue.AddAsync(data, _cts.Token).ConfigureAwait(false);
}
private async void RunConsumer()
{
using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
{
while (!_isCancelled)
{
// Wait until new segment data is added to the AsyncCollection and write it to disk
var data = await segmentDataQueue.TakeAsync(_cts.Token).ConfigureAwait(false);
await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
}
}
}
为每个下载分配一个序列号。将结果放入Dictionary<int, byte[]>
。每次下载完成时,它都会添加自己的结果。
然后检查是否有要写入磁盘的段:
while (dict.ContainsKey(lowestWrittenSegmentNumber + 1)) {
WriteSegment(dict[lowestWrittenSegmentNumber + 1]);
lowestWrittenSegmentNumber++;
}
这样一来,所有段都按顺序并缓冲地存储在磁盘上。
RunConsumer();
RunProducer();
确保使用 async Task
以便您可以使用 await Task.WhenAll(RunConsumer(), RunProducer());
等待完成。但是你应该不再需要 RunConsumer
。
我认为您在这里根本不需要 producer/consumer 队列。但是,我认为你应该避免 "fire and forget".
您可以同时启动它们,并在它们完成时处理它们。
首先,定义如何下载单个段:
private async Task<byte[]> DownloadTsSegmentAsync(string tsUrl)
{
var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
return await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
}
然后添加对播放列表的解析,从而生成 列表 片段下载(已经全部在进行中):
private List<Task<byte[]>> DownloadTasks(string data)
{
var result = new List<Task<byte[]>>();
string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
if (!lines.Any() || lines[0] != "#EXTM3U")
throw new Exception("Invalid m3u8 media playlist.");
...
if (_isNewSegment)
{
result.Add(DownloadTsSegmentAsync(line));
}
...
return result;
}
通过写入文件一次一个(按顺序)使用此列表:
private async Task RunConsumerAsync(List<Task<byte[]>> downloads)
{
using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
{
for (var task in downloads)
{
var data = await task.ConfigureAwait(false);
await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
}
}
}
并与制作人一起开始:
public async Task RunAsync()
{
// TODO: consider CancellationToken instead of a boolean.
while (!_isCancelled)
{
var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
var tasks = DownloadTasks(data);
await RunConsumerAsync(tasks);
await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
}
}
请注意,此解决方案会 运行 所有并发下载,这会导致内存压力。如果这是一个问题,我建议您重组以使用 TPL 数据流,它内置了对节流的支持。
我正在编写一个基本的 Http Live Stream (HLS) 下载器,我在其中以“#EXT-X-TARGETDURATION”指定的时间间隔重新下载 m3u8 媒体播放列表,然后下载 *.ts细分可用时。
这是 m3u8 媒体播放列表首次下载时的样子。
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:12
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:7.975,
http://website.com/segment_1.ts
#EXTINF:7.941,
http://website.com/segment_2.ts
#EXTINF:7.975,
http://website.com/segment_3.ts
我想使用 HttpClient async/await 同时下载这些 *.ts 片段。这些段的大小不同,因此即使先开始下载 "segment_1.ts",它也可能在其他两个段之后完成。
这些片段都是一个大视频的一部分,因此请务必按照开始顺序而不是结束顺序写入下载片段的数据。
如果分段一个接一个地下载,我下面的代码工作得很好,但当同时下载多个分段时就不行了,因为有时它们不会按照开始的顺序完成。
我考虑过使用 Task.WhenAll,它可以保证正确的顺序,但我不想将下载的段不必要地保留在内存中,因为它们的大小可能有几兆字节。如果 "segment_1.ts" 的下载确实先完成,那么应该立即将其写入磁盘,而不必等待其他段完成。将所有 *.ts 段写入单独的文件并在最后加入它们也不是一种选择,因为它需要双磁盘 space 并且整个视频的大小可能有几千兆字节。
我不知道该怎么做,我想知道是否有人可以帮助我。我正在寻找一种不需要我手动创建线程或长时间阻塞 ThreadPool 线程的方法。
一些代码和异常处理已被删除,以便更容易查看正在发生的事情。
// Async BlockingCollection from the AsyncEx library
private AsyncCollection<byte[]> segmentDataQueue = new AsyncCollection<byte[]>();
public void Start()
{
RunConsumer();
RunProducer();
}
private async void RunProducer()
{
while (!_isCancelled)
{
var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
if (!lines.Any() || lines[0] != "#EXTM3U")
throw new Exception("Invalid m3u8 media playlist.");
for (var i = 1; i < lines.Length; i++)
{
var line = lines[i];
if (line.StartsWith("#EXT-X-TARGETDURATION"))
{
ParseTargetDuration(line);
}
else if (line.StartsWith("#EXT-X-MEDIA-SEQUENCE"))
{
ParseMediaSequence(line);
}
else if (!line.StartsWith("#"))
{
if (_isNewSegment)
{
// Fire and forget
DownloadTsSegment(line);
}
}
}
// Wait until it's time to reload the m3u8 media playlist again
await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
}
}
// async void. We never await this method, so we can download multiple segments at once
private async void DownloadTsSegment(string tsUrl)
{
var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
var data = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
// Add the downloaded segment data to the AsyncCollection
await segmentDataQueue.AddAsync(data, _cts.Token).ConfigureAwait(false);
}
private async void RunConsumer()
{
using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
{
while (!_isCancelled)
{
// Wait until new segment data is added to the AsyncCollection and write it to disk
var data = await segmentDataQueue.TakeAsync(_cts.Token).ConfigureAwait(false);
await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
}
}
}
为每个下载分配一个序列号。将结果放入Dictionary<int, byte[]>
。每次下载完成时,它都会添加自己的结果。
然后检查是否有要写入磁盘的段:
while (dict.ContainsKey(lowestWrittenSegmentNumber + 1)) {
WriteSegment(dict[lowestWrittenSegmentNumber + 1]);
lowestWrittenSegmentNumber++;
}
这样一来,所有段都按顺序并缓冲地存储在磁盘上。
RunConsumer();
RunProducer();
确保使用 async Task
以便您可以使用 await Task.WhenAll(RunConsumer(), RunProducer());
等待完成。但是你应该不再需要 RunConsumer
。
我认为您在这里根本不需要 producer/consumer 队列。但是,我认为你应该避免 "fire and forget".
您可以同时启动它们,并在它们完成时处理它们。
首先,定义如何下载单个段:
private async Task<byte[]> DownloadTsSegmentAsync(string tsUrl)
{
var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false);
return await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
}
然后添加对播放列表的解析,从而生成 列表 片段下载(已经全部在进行中):
private List<Task<byte[]>> DownloadTasks(string data)
{
var result = new List<Task<byte[]>>();
string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries);
if (!lines.Any() || lines[0] != "#EXTM3U")
throw new Exception("Invalid m3u8 media playlist.");
...
if (_isNewSegment)
{
result.Add(DownloadTsSegmentAsync(line));
}
...
return result;
}
通过写入文件一次一个(按顺序)使用此列表:
private async Task RunConsumerAsync(List<Task<byte[]>> downloads)
{
using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read))
{
for (var task in downloads)
{
var data = await task.ConfigureAwait(false);
await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
}
}
}
并与制作人一起开始:
public async Task RunAsync()
{
// TODO: consider CancellationToken instead of a boolean.
while (!_isCancelled)
{
var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false);
var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
var tasks = DownloadTasks(data);
await RunConsumerAsync(tasks);
await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false);
}
}
请注意,此解决方案会 运行 所有并发下载,这会导致内存压力。如果这是一个问题,我建议您重组以使用 TPL 数据流,它内置了对节流的支持。