以原子方式从 ConcurrentQueue 中获取所有内容
Atomically taking everything from a ConcurrentQueue
我有多个线程生成项目并将它们粘贴在一个公共区域 ConcurrentQueue
:
private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();
private void BunchOfThreads () {
// ...
queuedItems.Enqueue(new GeneratedItem(...));
// ...
}
我有另一个消费者线程,但它需要在此应用程序的上下文中工作的方式是,偶尔,它只需要获取当前线程队列中的所有内容 ,将其从该队列中删除,一次完成。类似于:
private Queue<GeneratedItem> GetAllNewItems () {
return queuedItems.TakeEverything(); // <-- not a real method
}
我想我查看了所有文档(关于集合及其实现的接口),但我似乎没有找到任何类似 "concurrently take all objects from queue",甚至 "concurrently swap contents with another queue".[=19 的东西=]
如果我放弃 ConcurrentQueue
并用 lock
保护正常的 Queue
,我可以做到这一点,就像这样:
private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();
private void BunchOfThreads () {
// ...
lock (queuedItems) {
queuedItems.Enqueue(new GeneratedItem(...));
}
// ...
}
private Queue<GeneratedItem> GetAllNewItems () {
lock (queuedItems) {
Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
queuedItems.Clear();
return newItems;
}
}
但是,我喜欢 ConcurrentQueue
的便利性,而且由于我刚刚学习 C#,所以我对 API 感到好奇;所以我的问题是,有没有办法对其中一个并发集合执行此操作?
是否有某种方法可以访问 ConcurrentQueue
使用的任何同步对象并出于我自己的目的将其锁定,以便一切都很好地协同工作?然后我可以锁定它,拿走一切,然后释放?
这取决于你想做什么。根据 source code
中的评论
//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.
这是通过内部调用 ToList() 实现的,后者又在 m_numSnapshotTakers
和 spin 机制上运行
/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
// Increments the number of active snapshot takers. This increment must happen before the snapshot is
// taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
// eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0.
Interlocked.Increment(ref m_numSnapshotTakers);
List<T> list = new List<T>();
try
{
//store head and tail positions in buffer,
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
{
head.AddToList(list, headLow, tailHigh);
}
else
{
head.AddToList(list, headLow, SEGMENT_SIZE - 1);
Segment curr = head.Next;
while (curr != tail)
{
curr.AddToList(list, 0, SEGMENT_SIZE - 1);
curr = curr.Next;
}
//Add tail segment
tail.AddToList(list, 0, tailHigh);
}
}
finally
{
// This Decrement must happen after copying is over.
Interlocked.Decrement(ref m_numSnapshotTakers);
}
return list;
}
如果快照就是您想要的,那么您很幸运。但是,似乎没有内置方法以线程安全的方式从 ConcurrentQueue
中获取和删除所有项目。您将需要使用 lock
或类似的方法来烘焙您自己的同步。或者自己动手(查看源代码可能并不难)。
没有这样的方法,因为TakeEverything
实际上应该做什么是模棱两可的:
- 逐项取出,直到队列为空,然后 return 取出物品。
- 锁定对队列的全部访问,拍摄快照(循环拍摄所有项目)= 清除队列,解锁,return 快照。
考虑第一种情况,假设当您从队列中逐个删除项目时,其他线程正在写入队列 - TakeEverything
方法应该在结果中包含这些项目吗?
如果是那么你可以写成:
public List<GeneratedItem> TakeEverything()
{
var list = new List<GeneratedItem>();
while (queuedItems.TryDequeue(out var item))
{
list.Add(item);
}
return list;
}
如果没有,那么我仍然会使用 ConcurrentQueue
(because all the instance members - methods and properties - from ordinary Queue
are not thread safe) 并为每个 read/write 访问实施自定义锁定,因此您确保在 [=30] 时没有添加项目=] 来自队列。
我有多个线程生成项目并将它们粘贴在一个公共区域 ConcurrentQueue
:
private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();
private void BunchOfThreads () {
// ...
queuedItems.Enqueue(new GeneratedItem(...));
// ...
}
我有另一个消费者线程,但它需要在此应用程序的上下文中工作的方式是,偶尔,它只需要获取当前线程队列中的所有内容 ,将其从该队列中删除,一次完成。类似于:
private Queue<GeneratedItem> GetAllNewItems () {
return queuedItems.TakeEverything(); // <-- not a real method
}
我想我查看了所有文档(关于集合及其实现的接口),但我似乎没有找到任何类似 "concurrently take all objects from queue",甚至 "concurrently swap contents with another queue".[=19 的东西=]
如果我放弃 ConcurrentQueue
并用 lock
保护正常的 Queue
,我可以做到这一点,就像这样:
private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();
private void BunchOfThreads () {
// ...
lock (queuedItems) {
queuedItems.Enqueue(new GeneratedItem(...));
}
// ...
}
private Queue<GeneratedItem> GetAllNewItems () {
lock (queuedItems) {
Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
queuedItems.Clear();
return newItems;
}
}
但是,我喜欢 ConcurrentQueue
的便利性,而且由于我刚刚学习 C#,所以我对 API 感到好奇;所以我的问题是,有没有办法对其中一个并发集合执行此操作?
是否有某种方法可以访问 ConcurrentQueue
使用的任何同步对象并出于我自己的目的将其锁定,以便一切都很好地协同工作?然后我可以锁定它,拿走一切,然后释放?
这取决于你想做什么。根据 source code
中的评论//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.
这是通过内部调用 ToList() 实现的,后者又在 m_numSnapshotTakers
和 spin 机制上运行
/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
// Increments the number of active snapshot takers. This increment must happen before the snapshot is
// taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
// eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0.
Interlocked.Increment(ref m_numSnapshotTakers);
List<T> list = new List<T>();
try
{
//store head and tail positions in buffer,
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
{
head.AddToList(list, headLow, tailHigh);
}
else
{
head.AddToList(list, headLow, SEGMENT_SIZE - 1);
Segment curr = head.Next;
while (curr != tail)
{
curr.AddToList(list, 0, SEGMENT_SIZE - 1);
curr = curr.Next;
}
//Add tail segment
tail.AddToList(list, 0, tailHigh);
}
}
finally
{
// This Decrement must happen after copying is over.
Interlocked.Decrement(ref m_numSnapshotTakers);
}
return list;
}
如果快照就是您想要的,那么您很幸运。但是,似乎没有内置方法以线程安全的方式从 ConcurrentQueue
中获取和删除所有项目。您将需要使用 lock
或类似的方法来烘焙您自己的同步。或者自己动手(查看源代码可能并不难)。
没有这样的方法,因为TakeEverything
实际上应该做什么是模棱两可的:
- 逐项取出,直到队列为空,然后 return 取出物品。
- 锁定对队列的全部访问,拍摄快照(循环拍摄所有项目)= 清除队列,解锁,return 快照。
考虑第一种情况,假设当您从队列中逐个删除项目时,其他线程正在写入队列 - TakeEverything
方法应该在结果中包含这些项目吗?
如果是那么你可以写成:
public List<GeneratedItem> TakeEverything()
{
var list = new List<GeneratedItem>();
while (queuedItems.TryDequeue(out var item))
{
list.Add(item);
}
return list;
}
如果没有,那么我仍然会使用 ConcurrentQueue
(because all the instance members - methods and properties - from ordinary Queue
are not thread safe) 并为每个 read/write 访问实施自定义锁定,因此您确保在 [=30] 时没有添加项目=] 来自队列。