使用交换从 ConcurrentBag 中取出所有项目

Take all items from ConcurrentBag using a swap

我正试图从 ConcurrentBag 中一口气拿走所有物品。由于集合中没有 TryEmpty 之类的东西,因此我以与此处所述相同的方式使用 Interlocked.ExchangeHow to remove all Items from ConcurrentBag?

我的代码如下所示:

private ConcurrentBag<Foo> _allFoos; //Initialized in constructor.

public bool LotsOfThreadsAccessingThisMethod(Foo toInsert)
{
    this._allFoos.Add(toInsert);
    return true;
}

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken) state;
    var workingSet = new List<Foo>();

    while (!token.IsCancellationRequested)
    {
        if (!workingSet.Any())
        {
            workingSet = Interlocked.Exchange(ref this._allFoos, new ConcurrentBag<Foo>).ToList();
        }

        var processingCount = (int)Math.Min(workingSet.Count, TRANSACTION_LIMIT);

        if (processingCount > 0)
        {
            using (var ctx = new MyEntityFrameworkContext())
            {
                ctx.BulkInsert(workingSet.Take(processingCount));
            }
            workingSet.RemoveRange(0, processingCount);
        }
    }
}

问题是这有时会遗漏添加到列表中的项目。我编写了一个测试应用程序,将数据馈送到我的 ConcurrentBag.Add 方法并验证它正在发送所有数据。当我在 Add 调用上设置断点并检查之后 ConcurrentBag 的计数时,它为零。该项目只是没有被添加。

我相当肯定这是因为 Interlocked.Exchange 调用没有使用 ConcurrentBag 的内部锁定机制所以它在交换中的某处丢失了数据,但我不知道到底发生了什么。

如何在不使用我自己的锁定机制的情况下一次从 ConcurrentBag 中取出所有项目?为什么 Add 忽略该项目?

我认为不需要从 ConcurentBag 中取出所有物品。您只需按如下方式更改处理逻辑(无需自己的同步或互锁交换),即可实现与您尝试实现的行为完全相同的行为:

public void SingleThreadProcessingLoopAsALongRunningTask(object state)
{
    var token = (CancellationToken)state;
    var buffer = new List<Foo>(TRANSACTION_LIMIT);
    while (!token.IsCancellationRequested)
    {
        Foo item;
        if (!this._allFoos.TryTake(out item))
        {
            if (buffer.Count == 0) continue;
        }
        else
        {
            buffer.Add(item);
            if (buffer.Count < TRANSACTION_LIMIT) continue;
        }
        using (var ctx = new MyEntityFrameworkContext())
        {
            ctx.BulkInsert(buffer);
        }
        buffer.Clear();
    }
}