使用具有 1 个同步生产者的生产者-消费者处理流数据

Process stream data using producer-consumers with 1 synchronous producer

我有一个具有以下工作流程的应用程序。

  1. 用户使用minio
  2. 上传基于行的json文件(每行是一条记录)
  3. 然后他们发送申请请求进行处理。

然后应用程序开始使用具有以下签名

this方法将数据作为流下载
Task GetObjectAsync(string bucketName, string objectName, Action<Stream> callback)

我使用的回调方法是这样的:

void ProcessLine(Stream s)
{
    using(var streamReader = new StreamReader(s))
    {
        while(!streamReader.EndOfStream)
        {
            var line = streamReader.ReadLine(); // notice that I can't use ReadLineAsync
            var obj = DeserializeLine(line);
            // some other operations
            database.Store(obj) 
            // there is an alternative StoreAsync() which I can' use
        }
    }
}

只要我不需要使用方法的异步版本并且文件相对较小,它就可以很好地工作。

不幸的是,我需要为只有一个但非常大的文件(20gb 或内存无法容纳的任何文件,想象一个巨大的数据集)的用例做好准备。

为此,我决定使用实施生产者-消费者队列,该队列将从回调操作中填充,然后由一些工作人员处理。

我使用了ConcurentQueue作为数据结构和下面的回调

void PopulateQueue(Stream s)
{
    using(var streamReader = new StreamReader(s))
    {
        while(!streamReader.EndOfStream)
        {
            var line = streamReader.ReadLine();
            var obj = DeserializeLine(line);
            _queue.Enqueue(obj); // _queue is a private field of a type ConcurentQueue<MyObject> 
        }
    }
}

工作人员的处理方式与原始版本相同,但使用异步方法。

这里的问题是当生产者填充消费者正在处理的队列(出列数据)时要快得多。队列开始增长,正如所假设的那样,由于文件非常大,它将耗尽所有内存。

一个明显的解决方案是限制队列中的记录数量。但我不知道如何在同步回调中​​做到这一点。在异步队列中,只要队列中的记录太多,我就会使用 await Task.Delay(100)

根据 this article,我应该避免使用 Task.Wait(),因为它对性能或死锁的可能性有负面​​影响。

我阅读了 Stephen Cleary 的一些关于 .Net 中异步最佳实践的文章。不幸的是,我从他们那里了解到,在这种情况下,没有正确的方法从同步回调中​​调用异步方法,我对使用 Thread.Sleep() 或忙等待感到不好。

对于如何在不违反异步准则的情况下使用生产者-消费者模式或以其他方式解决此问题,您有什么建议吗?

谢谢。

注意:我考虑过在上传文件时将文件分成固定大小的块,但它有其自身的缺陷。

正如@Hans Passant 在评论中指出的那样,BlockingCollection 解决了这个问题。

它有内部项目的限制,当达到限制时,它会阻塞生产者,直到项目数量减少。