具有读取和写入批量数据能力的生产者消费者集合

Producer consumer collection with ability to read and write batches of data

我正在寻找像 BufferBlock

这样的合集

但使用如下方法:

SendAsync<T>(T[])
T[] ReceiveAsync<T>()

有人可以帮忙吗?

ReceiveAsync and SendAsync 可用作 ISourceBlock 和 ITargetBlockT<> 接口上的扩展方法。这意味着您必须将块转换为那些接口才能使用扩展方法,例如:

var buffer=new BufferBlock<string>();

var source=(ISourceBlock<string>)buffer;
var target=(ITargetBlock<string>)buffer;

await target.SendAsync("something");

通常这不是问题,因为所有 Dataflow 方法都接受接口,而不是具体类型,例如:

async Task MyProducer(ITargetBlock<string> target)
{
    ...
    await target.SendAsync(..);
    ...
    target.Complete();
}

async Task MyConsumer(ISourceBlock<string> target)
{
    ...
    var message=await target.ReceiveAsync();
    ...
}

public static async Task Main()
{
    var buffer=new BufferBlock<string>();
    MyProducer(buffer);
    await MyConsumer(buffer);
}

这些方法不可用,SendAsync<T> 只需要一个 TRecieveAsync<T> 只需要 return 一个 T,而不是数组。

SendAsync<T>(T[])
T[] ReceiveAsync<T>()

但是有 TryReceiveAll<T>(out IList<T> items),你可以在循环中调用 SendAsync<T> 将数组发送到 BufferBlock 或者编写你自己的扩展方法,像这样:

public static async Task SendAllAsync<T>(this ITargetBlock<T> block, IEnumerable<T> items)
{
    foreach(var item in items)
    {
         await block.SendAsync(item)
    }
}

请注意,SendAsync 执行 return 一个表示接受消息的布尔值,您可以 return 一个布尔值数组,或者只是 return 如果它们中的任何一个返回false 但这取决于你。

可能使用 BatchBlock<T> 会更容易,您可以使用循环将项目单独发送给但分批发送项目,这比使用 TryRecieveAll 更容易,如果您正在建设管道。 BatchBlock Walkthrough and BatchBlock Example