"bounded" 批处理块 => 动作块。如何完成正确的方法?

"bounded" BatchBlock => ActionBlock. How to complete the proper way?

我正在尝试使用链接到操作块的有界批处理块。 我知道 batchblock 中的项目何时结束,我想触发一个完成链。

问题是:如果我的 BatchBlock<T> 是给定的 BoundedCapacity,我将不会在操作块中触发所有项目。

这是我的问题示例,它应该(根据我对 TPL 数据流的理解...)打印 0 到 124 但它最终打印 0 到 99。

一定是我遗漏了什么...也许 BoundedCapacity 意味着 "drop items when queue count is over xxx..." 如果是这样,我怎样才能达到保证的最大内存消耗?

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            int itemsCount = 125;
            List<int> ints = new List<int>(itemsCount);
            for (int i = 0; i < itemsCount; i++)
                ints.Add(i);

            BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
            ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
            {
                Thread.Sleep(1000);
                foreach (int i in intsBatch)
                    Console.WriteLine(i);               
            });
            batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            // feed the batch block
            foreach (int i in ints)
                batchBlock.Post(i);
            // Don't know how to end the proper way... Meaning it should display 0 to 124 and not 0 to 99
            batchBlock.Complete();
            batchBlock.TriggerBatch();
            actionBlock.Completion.Wait();
        }
    }
}

Post 上的块并不总是成功。它尝试 post 向块发送消息,但如果达到 BoundedCapacity 它将失败并且 return false.

您可以使用 SendAsync 而不是 return 等待任务。如果该块有空间容纳您的消息,它将异步完成。如果没有,那么块 return 是一个任务,当它有空间接受新消息时将完成。您可以等待该任务并限制您的插入:

async Task MainAsync()
{
    var ints = Enumerable.Range(0, 125).ToList();
    var batchBlock = new BatchBlock<int>(50, new GroupingDataflowBlockOptions { BoundedCapacity = 100 });
    var actionBlock = new ActionBlock<int[]>(intsBatch =>
    {
        Thread.Sleep(1000);
        foreach (var i in intsBatch)
            Console.WriteLine(i);
    });
    batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var i in ints)
        await batchBlock.SendAsync(i); // wait synchronously for the block to accept.

    batchBlock.Complete();
    await actionBlock.Completion;
}