以原子方式从 ConcurrentQueue 中获取所有内容

Atomically taking everything from a ConcurrentQueue

我有多个线程生成项目并将它们粘贴在一个公共区域 ConcurrentQueue:

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

我有另一个消费者线程,但它需要在此应用程序的上下文中工作的方式是,偶尔,它只需要获取当前线程队列中的所有内容 ,将其从该队列中删除,一次完成。类似于:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

我想我查看了所有文档(关于集合及其实现的接口),但我似乎没有找到任何类似 "concurrently take all objects from queue",甚至 "concurrently swap contents with another queue".[=19 的东西=]

如果我放弃 ConcurrentQueue 并用 lock 保护正常的 Queue,我可以做到这一点,就像这样:

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

但是,我喜欢 ConcurrentQueue 的便利性,而且由于我刚刚学习 C#,所以我对 API 感到好奇;所以我的问题是,有没有办法对其中一个并发集合执行此操作?

是否有某种方法可以访问 ConcurrentQueue 使用的任何同步对象并出于我自己的目的将其锁定,以便一切都很好地协同工作?然后我可以锁定它,拿走一切,然后释放?

这取决于你想做什么。根据 source code

中的评论
//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

这是通过内部调用 ToList() 实现的,后者又在 m_numSnapshotTakersspin 机制上运行

/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

如果快照就是您想要的,那么您很幸运。但是,似乎没有内置方法以线程安全的方式从 ConcurrentQueue 中获取和删除所有项目。您将需要使用 lock 或类似的方法来烘焙您自己的同步。或者自己动手(查看源代码可能并不难)。

没有这样的方法,因为TakeEverything实际上应该做什么是模棱两可的:

  1. 逐项取出,直到队列为空,然后 return 取出物品。
  2. 锁定对队列的全部访问,拍摄快照(循环拍摄所有项目)= 清除队列,解锁,return 快照。

考虑第一种情况,假设当您从队列中逐个删除项目时,其他线程正在写入队列 - TakeEverything 方法应该在结果中包含这些项目吗?

如果是那么你可以写成:

public List<GeneratedItem> TakeEverything()
{
    var list = new List<GeneratedItem>();

    while (queuedItems.TryDequeue(out var item))
    {
        list.Add(item);
    }

    return list;
}

如果没有,那么我仍然会使用 ConcurrentQueue (because all the instance members - methods and properties - from ordinary Queue are not thread safe) 并为每个 read/write 访问实施自定义锁定,因此您确保在 [=30] 时没有添加项目=] 来自队列。