使用具有 1 个同步生产者的生产者-消费者处理流数据
Process stream data using producer-consumers with 1 synchronous producer
我有一个具有以下工作流程的应用程序。
- 用户使用minio
上传基于行的json文件(每行是一条记录)
- 然后他们发送申请请求进行处理。
然后应用程序开始使用具有以下签名
的this方法将数据作为流下载
Task GetObjectAsync(string bucketName, string objectName, Action<Stream> callback)
我使用的回调方法是这样的:
void ProcessLine(Stream s)
{
using(var streamReader = new StreamReader(s))
{
while(!streamReader.EndOfStream)
{
var line = streamReader.ReadLine(); // notice that I can't use ReadLineAsync
var obj = DeserializeLine(line);
// some other operations
database.Store(obj)
// there is an alternative StoreAsync() which I can' use
}
}
}
只要我不需要使用方法的异步版本并且文件相对较小,它就可以很好地工作。
不幸的是,我需要为只有一个但非常大的文件(20gb 或内存无法容纳的任何文件,想象一个巨大的数据集)的用例做好准备。
为此,我决定使用实施生产者-消费者队列,该队列将从回调操作中填充,然后由一些工作人员处理。
我使用了ConcurentQueue作为数据结构和下面的回调
void PopulateQueue(Stream s)
{
using(var streamReader = new StreamReader(s))
{
while(!streamReader.EndOfStream)
{
var line = streamReader.ReadLine();
var obj = DeserializeLine(line);
_queue.Enqueue(obj); // _queue is a private field of a type ConcurentQueue<MyObject>
}
}
}
工作人员的处理方式与原始版本相同,但使用异步方法。
这里的问题是当生产者填充消费者正在处理的队列(出列数据)时要快得多。队列开始增长,正如所假设的那样,由于文件非常大,它将耗尽所有内存。
一个明显的解决方案是限制队列中的记录数量。但我不知道如何在同步回调中做到这一点。在异步队列中,只要队列中的记录太多,我就会使用 await Task.Delay(100)
。
根据 this article,我应该避免使用 Task.Wait()
,因为它对性能或死锁的可能性有负面影响。
我阅读了 Stephen Cleary 的一些关于 .Net 中异步最佳实践的文章。不幸的是,我从他们那里了解到,在这种情况下,没有正确的方法从同步回调中调用异步方法,我对使用 Thread.Sleep()
或忙等待感到不好。
对于如何在不违反异步准则的情况下使用生产者-消费者模式或以其他方式解决此问题,您有什么建议吗?
谢谢。
注意:我考虑过在上传文件时将文件分成固定大小的块,但它有其自身的缺陷。
正如@Hans Passant 在评论中指出的那样,BlockingCollection 解决了这个问题。
它有内部项目的限制,当达到限制时,它会阻塞生产者,直到项目数量减少。
我有一个具有以下工作流程的应用程序。
- 用户使用minio 上传基于行的json文件(每行是一条记录)
- 然后他们发送申请请求进行处理。
然后应用程序开始使用具有以下签名
的this方法将数据作为流下载Task GetObjectAsync(string bucketName, string objectName, Action<Stream> callback)
我使用的回调方法是这样的:
void ProcessLine(Stream s)
{
using(var streamReader = new StreamReader(s))
{
while(!streamReader.EndOfStream)
{
var line = streamReader.ReadLine(); // notice that I can't use ReadLineAsync
var obj = DeserializeLine(line);
// some other operations
database.Store(obj)
// there is an alternative StoreAsync() which I can' use
}
}
}
只要我不需要使用方法的异步版本并且文件相对较小,它就可以很好地工作。
不幸的是,我需要为只有一个但非常大的文件(20gb 或内存无法容纳的任何文件,想象一个巨大的数据集)的用例做好准备。
为此,我决定使用实施生产者-消费者队列,该队列将从回调操作中填充,然后由一些工作人员处理。
我使用了ConcurentQueue作为数据结构和下面的回调
void PopulateQueue(Stream s)
{
using(var streamReader = new StreamReader(s))
{
while(!streamReader.EndOfStream)
{
var line = streamReader.ReadLine();
var obj = DeserializeLine(line);
_queue.Enqueue(obj); // _queue is a private field of a type ConcurentQueue<MyObject>
}
}
}
工作人员的处理方式与原始版本相同,但使用异步方法。
这里的问题是当生产者填充消费者正在处理的队列(出列数据)时要快得多。队列开始增长,正如所假设的那样,由于文件非常大,它将耗尽所有内存。
一个明显的解决方案是限制队列中的记录数量。但我不知道如何在同步回调中做到这一点。在异步队列中,只要队列中的记录太多,我就会使用 await Task.Delay(100)
。
根据 this article,我应该避免使用 Task.Wait()
,因为它对性能或死锁的可能性有负面影响。
我阅读了 Stephen Cleary 的一些关于 .Net 中异步最佳实践的文章。不幸的是,我从他们那里了解到,在这种情况下,没有正确的方法从同步回调中调用异步方法,我对使用 Thread.Sleep()
或忙等待感到不好。
对于如何在不违反异步准则的情况下使用生产者-消费者模式或以其他方式解决此问题,您有什么建议吗?
谢谢。
注意:我考虑过在上传文件时将文件分成固定大小的块,但它有其自身的缺陷。
正如@Hans Passant 在评论中指出的那样,BlockingCollection 解决了这个问题。
它有内部项目的限制,当达到限制时,它会阻塞生产者,直到项目数量减少。