如何从优先于第一个集合的任意两个 BlockingCollections 中取出一个项目?

How to take an item from any two BlockingCollections with priority to the first collection?

我有两个 BlockingCollection<T> 对象,collection1collection2。我想使用这些集合中的项目,优先考虑 collection1 中的项目。也就是说,如果两个集合都有项目,我想先从 collection1 中取出项目。如果其中 none 个有物品,我想等待物品可用。

我有以下代码:

public static T Take<T>(
    BlockingCollection<T> collection1,
    BlockingCollection<T> collection2) where T:class
{
    if (collection1.TryTake(out var item1))
    {
        return item1;
    }

    T item2;

    try
    {
        BlockingCollection<T>.TakeFromAny(
            new[] { collection1, collection2 },
            out item2);
    }
    catch (ArgumentException)
    {
        return null;
    }

    return item2;
}

当对两个集合调用 CompleteAdding 且它们均为空时,此代码应 return null

此代码的主要问题是 TakeFromAny 方法的文档指定如果 CompleteAdding 在“集合”上调用,TakeFromAny 将抛出 ArgumentException ":

ArgumentException

The collections argument is a 0-length array or contains a null element or CompleteAdding() has been called on the collection.

如果在任何集合上调用 CompleteAdding 是否会抛出异常?还是两个系列?

如果调用 CompleteAdding 并且集合中仍有一些项目,它会抛出吗?

我需要一个可靠的方法来做到这一点。

在这段代码中,我试图首先从 collection1 获取,因为 TakeFromAny 的文档不提供任何关于如果两个集合都有项目时从中获取项目的收集顺序的保证.

这也意味着,如果两个集合都是空的,然后它们稍后同时收到项目,那么我可能会先从 collection2 中获得一个项目,这很好。

编辑:

我将项目添加到两个集合(而不仅仅是一个集合)的原因是第一个集合没有上限,而第二个集合有。

对于那些对我为什么需要这个感兴趣的人有更多详细信息:

我在一个名为 ProceduralDataflow 的开源项目中使用它。详情请看这里https://github.com/ymassad/ProceduralDataflow

数据流系统中的每个处理节点都有两个集合,一个集合将包含第一次出现的项目(因此如果需要我需要减慢生产者的速度),另一个集合将包含第二次出现的项目(或第三次,..) 次(由于数据流中的循环)。

一个集合没有上限的原因是我不希望由于数据流中的循环而出现死锁。

首先,简短回答您的具体问题。

Does it throw if CompleteAdding was called on any collection? or both collections?

两者(全部)- 但前提是任何集合中都没有可用元素。

What if CompleteAdding was called and the collection still has some items, does it throw?

没有。如果集合中有可用的元素,将从集合中移除并返回给调用者。

结论

显然文档不清楚。部分

or CompleteAdding() has been called on the collection

应该有不同的表述 - 类似于

or there is no available element in any of the collections and CompleteAdding() has been called on all the collections

理由

好吧,我知道依赖实现不是一个好的做法,但是当文档不清楚时,实现是我能想到的唯一可靠的官方来源。所以取reference source, both TakeFromAny and TryTakeFromAny call a private method TryTakeFromAnyCore。它以以下内容开头:

ValidateCollectionsArray(collections, false);

false 这里有一个名为 isAddOperationbool 参数,在 ValidateCollectionsArray 中使用如下:

if (isAddOperation && collections[i].IsAddingCompleted)
{
    throw new ArgumentException(
        SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}

这是为调用 CompleteAdding() 的集合抛出 ArgumentException 的可能位置之一。正如我们所见,情况并非如此(问题 #1)。

然后继续执行下面的"fast path":

//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
    if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
        return i;
}

这证明了问题 #2 的答案。

最后,如果任何集合中都没有可用的元素,实现通过调用另一个私有方法TryTakeFromAnyCoreSlow来获取"slow path",下面的注释是实现的基本解释行为:

//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw

我们两个问题的答案都在案例 #1 和案例 #5 中(注意单词 all)。顺便说一句,它还显示了 TakeFromAnyTryTakeFromAny 之间的唯一区别 - 案例 #4 与 #5,即 throwreturn -1.