处理保存在 ConcurrentQueue<T> 中的数据

Working with data held in a ConcurrentQueue<T>

我有一个后台工作程序,可以流式传输数据并将其保存到 ConcurrentQueue<T>,这是我需要的,因为它是线程安全的先进先出集合,但我还需要执行诸如执行简单的任务计算或从该集合中提取数据,我不确定此时我需要使用什么。下面是一些示例伪代码:

public class ExampleData
{
     public DateTime Date { get; set; }
     public decimal Value { get; set; }
}

public ConcurrentQueue<ExampleData> QueueCol { get; set; } = new();

public void AddToQueue(DateTime date, decimal value)
{
     QueueCol.Enqueue(new ExampleData() { Date = date, Value = value });
}

public void DisplayPastData()
{
     var count = QueueCol.Count();
     var prev1Data = count >= 2 ? QueueCol.ElementAt(count - 2) : null;
     var prev2Data = count >= 3 ? QueueCol.ElementAt(count - 3) : null;
     var prev3Data = count >= 4 ? QueueCol.ElementAt(count - 4) : null;

     if (prev1Data != null)
     {
         Console.WriteLine($"Date: {prev1Data.Date} Value: {prev1Data.Value}");
     }
     
     if (prev2Data != null)
     {
         Console.WriteLine($"Date: {prev2Data.Date} Value: {prev2Data.Value}");
     }

     if (prev3Data != null)
     {
         Console.WriteLine($"Date: {prev3Data.Date} Value: {prev3Data.Value}");
     }
}

这是一个非常粗略的例子,但即使显示了数据,它的大部分看起来都是正确的,然后我会得到完全不在左边字段中的日期,比如前一天的日期在当天的日期之间,所以因为像这样的排序问题我知道数据不正确所以我的问题是如何将并发队列转换为一个新集合,使我能够保持顺序并处理数据而不会给出不正确的结果?

您在问题中描述的使用模式使得 ConcurrentQueue<T> 不适合您的场景。据我所知,要求是:

  1. 生产者应该能够将集合中的项目排入队列而不会被阻塞任何时间。
  2. 消费者应该能够在集合的快照上执行计算,而无需创建昂贵的集合副本,并且不会以任何方式干扰生产者。

开箱即用似乎更适合您的场景的集合是ImmutableList<T>。这个集合可以用 lock-free Interlocked 操作更新,它本身就是一个快照(因为它是不可变的)。以下是如何在多线程场景中使用它,使用 thread-safety 并且不阻塞任何线程:

private ImmutableList<ExampleData> _data = ImmutableList<ExampleData>.Empty;

public ImmutableList<ExampleData> Data => Volatile.Read(ref _data);

public void AddToQueue(DateTime date, decimal value)
{
    var newData = new ExampleData() { Date = date, Value = value };
    ImmutableInterlocked.Update(ref _data, (x, y) => x.Add(y), newData);
}

public void DisplayPastData()
{
    ImmutableList<ExampleData> snapshot = Volatile.Read(ref _data);
    int count = snapshot.Count;
    var prev1Data = count >= 2 ? snapshot[count - 2] : null;
    var prev2Data = count >= 3 ? snapshot[count - 3] : null;
    var prev3Data = count >= 4 ? snapshot[count - 4] : null;

    if (prev1Data != null)
    {
        Console.WriteLine($"Date: {prev1Data.Date} Value: {prev1Data.Value}");
    }

    if (prev2Data != null)
    {
        Console.WriteLine($"Date: {prev2Data.Date} Value: {prev2Data.Value}");
    }

    if (prev3Data != null)
    {
        Console.WriteLine($"Date: {prev3Data.Date} Value: {prev3Data.Value}");
    }
}

不可变集合并非没有缺点。与普通集合相比,它们是 lot slower,它们需要更多的内存,并且每次更新时都会产生更多的垃圾。

针对您的特定情况的最佳解决方案可能是 ConcurrentQueue<ExampleData>(最新数据)和 List<ExampleData>(历史数据)的组合。生产者将 ConcurrentQueue<T> 中的项目排入队列,单个消费者将从 ConcurrentQueue<T> 中的所有项目出队,然后将它们添加到 List<T> 中。然后它将使用 List<T> 进行计算。