使用交换从 ConcurrentBag 中取出所有项目
Take all items from ConcurrentBag using a swap
我正试图从 ConcurrentBag
中一口气拿走所有物品。由于集合中没有 TryEmpty
之类的东西,因此我以与此处所述相同的方式使用 Interlocked.Exchange
:How to remove all Items from ConcurrentBag?
我的代码如下所示:
private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.
public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
this._allFoos.Add(toInsert);
return true;
}
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken) state;
var workingSet = new List<Foo>();
while (!token.IsCancellationRequested)
{
if (!workingSet.Any())
{
workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
}
var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);
if (processingCount > 0)
{
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(workingSet.Take(processingCount));
}
workingSet.RemoveRange(0, processingCount);
}
}
}
问题是这有时会遗漏添加到列表中的项目。我编写了一个测试应用程序,将数据馈送到我的 ConcurrentBag.Add
方法并验证它正在发送所有数据。当我在 Add
调用上设置断点并检查之后 ConcurrentBag
的计数时,它为零。该项目只是没有被添加。
我相当肯定这是因为 Interlocked.Exchange
调用没有使用 ConcurrentBag
的内部锁定机制所以它在交换中的某处丢失了数据,但我不知道到底发生了什么。
如何在不使用我自己的锁定机制的情况下一次从 ConcurrentBag
中取出所有项目?为什么 Add
忽略该项目?
我认为不需要从 ConcurentBag
中取出所有物品。您只需按如下方式更改处理逻辑(无需自己的同步或互锁交换),即可实现与您尝试实现的行为完全相同的行为:
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken)state;
var buffer = new List<Foo>(TRANSACTION_LIMIT);
while (!token.IsCancellationRequested)
{
Foo item;
if (!this._allFoos.TryTake(out item))
{
if (buffer.Count == 0) continue;
}
else
{
buffer.Add(item);
if (buffer.Count < TRANSACTION_LIMIT) continue;
}
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(buffer);
}
buffer.Clear();
}
}
我正试图从 ConcurrentBag
中一口气拿走所有物品。由于集合中没有 TryEmpty
之类的东西,因此我以与此处所述相同的方式使用 Interlocked.Exchange
:How to remove all Items from ConcurrentBag?
我的代码如下所示:
private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.
public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
this._allFoos.Add(toInsert);
return true;
}
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken) state;
var workingSet = new List<Foo>();
while (!token.IsCancellationRequested)
{
if (!workingSet.Any())
{
workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
}
var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);
if (processingCount > 0)
{
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(workingSet.Take(processingCount));
}
workingSet.RemoveRange(0, processingCount);
}
}
}
问题是这有时会遗漏添加到列表中的项目。我编写了一个测试应用程序,将数据馈送到我的 ConcurrentBag.Add
方法并验证它正在发送所有数据。当我在 Add
调用上设置断点并检查之后 ConcurrentBag
的计数时,它为零。该项目只是没有被添加。
我相当肯定这是因为 Interlocked.Exchange
调用没有使用 ConcurrentBag
的内部锁定机制所以它在交换中的某处丢失了数据,但我不知道到底发生了什么。
如何在不使用我自己的锁定机制的情况下一次从 ConcurrentBag
中取出所有项目?为什么 Add
忽略该项目?
我认为不需要从 ConcurentBag
中取出所有物品。您只需按如下方式更改处理逻辑(无需自己的同步或互锁交换),即可实现与您尝试实现的行为完全相同的行为:
public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
var token = (CancellationToken)state;
var buffer = new List<Foo>(TRANSACTION_LIMIT);
while (!token.IsCancellationRequested)
{
Foo item;
if (!this._allFoos.TryTake(out item))
{
if (buffer.Count == 0) continue;
}
else
{
buffer.Add(item);
if (buffer.Count < TRANSACTION_LIMIT) continue;
}
using (var ctx = new MyEntityFrameworkContext())
{
ctx.BulkInsert(buffer);
}
buffer.Clear();
}
}