await SendAsync 不等待 TPL 数据流 BatchBlock

await SendAsync does not await on TPL Dataflow BatchBlock

示例程序有以下 BatchBlock:new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });,有 60 个 int 数据项被发送到其中,并在一个单独的任务上消耗。

问题是 await sourceBlock.SendAsync(i); 似乎没有等待,即使达到了 BatchBlock 的限制容量,数据仍在连续发送,而没有消耗任务先取出任何项目。最终 BatchBlock 仅接收 2 批 10 个 int 数据项。我希望 await sourceBlock.SendAsync(i); 在发送 20 个项目时暂停执行,因为块的边界容量设置为 10,最多 2 个组。然后在某个时候消费任务会收到数据并且这个过程会重复。

我附上了下面的代码,创建一个简单的控制台应用程序,将以下内容添加到 main:

BatchBlockIssueReplication().GetAwaiter().GetResult();

调用方法:

    public static async Task BatchBlockIssueReplication()
    {
        var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });

        // Reading data from the source block
        Task fireAndForget = Task.Run(async () =>
        {
            while (!sourceBlock.Completion.IsCanceled)
            {
                await Task.Delay(1500);
                if (await sourceBlock.OutputAvailableAsync() && sourceBlock.TryReceiveAll(out var results))
                {
                    Console.WriteLine("Received: ");
                    foreach (var result in results)
                    {
                        Console.Write($"{result.Length} ");
                    }
                    Console.WriteLine();
                }
            }
        });

        for (int i = 0; i < 60; i++)
        {
            Console.WriteLine($"Sending {i} to the source block");
            await sourceBlock.SendAsync(i);
        }
        Console.WriteLine("Finished sending data to the source block");

        await Task.Delay(10000);
    }

一旦达到最大值 await sourceBlock.SendAsync(i); 将不会暂停,因为该块主动拒绝更多项目。当这种情况发生时 SendAsync returns false 表示该块将不接受新消息。如果您写出 SendAsync 调用的结果,您可以看到块停止接收新消息的位置:

Sending 0 to the source block
True
Sending 1 to the source block
True
Sending 2 to the source block
True
Sending 3 to the source block
True
Sending 4 to the source block
True
Sending 5 to the source block
True
Sending 6 to the source block
True
Sending 7 to the source block
True
Sending 8 to the source block
True
Sending 9 to the source block
True
Sending 10 to the source block
True
Sending 11 to the source block
True
Sending 12 to the source block
True
Sending 13 to the source block
True
Sending 14 to the source block
True
Sending 15 to the source block
True
Sending 16 to the source block
True
Sending 17 to the source block
True
Sending 18 to the source block
True
Sending 19 to the source block
True
Sending 20 to the source block
False
Sending 21 to the source block
False
Sending 22 to the source block
False
Sending 23 to the source block
False
Sending 24 to the source block
False
Sending 25 to the source block
False
Sending 26 to the source block
False
Sending 27 to the source block
False
Sending 28 to the source block
False
Sending 29 to the source block
False
Sending 30 to the source block
False
Sending 31 to the source block
False
Sending 32 to the source block
False
Sending 33 to the source block
False
Sending 34 to the source block
False
Sending 35 to the source block
False
Sending 36 to the source block
False
Sending 37 to the source block
False
Sending 38 to the source block
False
Sending 39 to the source block
False
Sending 40 to the source block
False
Sending 41 to the source block
False
Sending 42 to the source block
False
Sending 43 to the source block
False
Sending 44 to the source block
False
Sending 45 to the source block
False
Sending 46 to the source block
False
Sending 47 to the source block
False
Sending 48 to the source block
False
Sending 49 to the source block
False
Sending 50 to the source block
False
Sending 51 to the source block
False
Sending 52 to the source block
False
Sending 53 to the source block
False
Sending 54 to the source block
False
Sending 55 to the source block
False
Sending 56 to the source block
False
Sending 57 to the source block
False
Sending 58 to the source block
False
Sending 59 to the source block
False
Finished sending data to the source block
Received: 
10 10

您尚未设置 BoundedCapacity,它控制 输入 缓冲区中可以等待的项目数。超过该值将使 SendAsync 等待。

您设置了 MaxNumberOfGroups 属性,这是该块在拒绝接收任何其他输入之前将生成多少组。

来自文档:

Gets or sets the maximum number of groups that should be generated by the block.

如果您希望您的块在输入缓冲区中保留例如 20 个块并等待,您应该设置 BoundedCapacity :

var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions 
                                          { 
                                              BoundedCapacity = 20 
                                          });