BlockingCollection<T>.TakeFromAny,用于具有不同通用类型的集合

BlockingCollection<T>.TakeFromAny, for collections with a different generic type

.NET 中有一个 BlockingCollection<T>.TakeFromAny 方法。它首先尝试快速获取 Take,然后默认为等待底层句柄的 'slow' 方法。我想用它来听取供应 "Messages" 的上游生产商和供应 "Results".

的下游生产商的声音

下面的代码不是类型有效的,自然无法编译:

object anyValue;
var collection = new List<BlockingCollection<object>>();
// following fails: cannot convert
//    from 'System.Collections.Concurrent.BlockingCollection<Message>'
//    to 'System.Collections.Concurrent.BlockingCollection<object>'
collection.Add(new BlockingCollection<Message>());
// fails for same reason
collection.Add(new BlockingCollection<Result>());
BlockingCollection<object>.TakeFromAny(collection.ToArray(), out anyValue);

可以只处理 new BlockingCollection<object>() 个实例并在 Take 上强制转换以避免编译类型错误,尽管这让我感到不对(呃)——尤其是因为通过方法接口丢失了类型。使用包装组合类型可以解决后者; fsvo 'solve'.


下面没有任何内容与问题直接相关,尽管它为感兴趣的人提供了上下文。更高级别的构造(例如 Rx 或 TPL 数据流)不可用于提供核心基础结构功能的代码。

这是一个基本的流程模型。生产者、代理和工作人员 运行 在不同的线程上(工作人员可以 运行 在同一线程上,具体取决于任务调度程序的作用)。

[producer]   message -->   [proxy]   message --> [worker 1]
             <-- results             <-- results
                                     message --> [worker N..]
                                     <-- results

期望代理侦听消息(传入)和结果(返回)。代理会做一些转换和分组等工作,并将结果作为反馈。

将代理作为一个单独的线程将其与执行各种猴子业务的初始生产源隔离开来。工作任务是为了并行性,而不是异步性,线程(在争用 reduced/eliminated 之后,尽管代理中的分组)应该允许良好的缩放。

队列是在代理和工作人员之间建立的(而不是使用单个 input/result 的直接任务),因为当工作人员正在执行时,可能有额外的传入工作消息可以在它结束之前处理.这是为了确保工作人员可以 prolong/reuse 它在一系列相关工作中建立的上下文。

我认为这里最好的选择是将两个阻塞集合的类型更改为您已经提到的 BlockingCollection<object>,包括它的缺点。

如果您不能或不想这样做,另一种解决方案是合并 BlockingCollection<object> 和每个源集合的线程,将项目从其集合移动到合并的集合:

var producerCollection = new BlockingCollection<Message>();
var consumerCollection = new BlockingCollection<Results>();

var combinedCollection = new BlockingCollection<object>();

var producerCombiner = Task.Run(() =>
{
    foreach (var item in producerCollection.GetConsumingEnumerable())
    {
        combinedCollection.Add(item);
    }
});

var consumerCombiner = Task.Run(() =>
{
    foreach (var item in consumerCollection.GetConsumingEnumerable())
    {
        combinedCollection.Add(item);
    }
});

Task.WhenAll(producerCombiner, consumerCombiner)
    .ContinueWith(_ => combinedCollection.CompleteAdding());

foreach (var item in combinedCollection.GetConsumingEnumerable())
{
    // process item here
}

效率不高,因为它会阻塞两个额外的线程来执行此操作,但这是我能想到的最好的替代方法,无需使用反射来访问 TakeFromAny.[=14 使用的句柄=]