Transform Block with parallelism and bounded capacity 延迟消息行为

Transform Block with parallelism and bounded capacity postponing message behavior

TransformBlock 有一个 MaxDegreeOfParallelism > 1BoundedCapacity 不是无限的,为什么它推迟接收更多的消息,尽管有一个长 运行 任务输入队列中有容量吗?

采用以下控制台应用程序。它创建一个带有 MaxDegreeOfParallelism = 5BoundedCapacity = 5 的 TransformBlock,然后向它提供 100 条消息。当块处理消息 x == 50 时,它会延迟该任务 10 秒。

TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
    if (x == 50)
    {
        Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
        await Task.Delay(10000);
    }
    Console.WriteLine($"processed message {x}");
    return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });

DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block

for (int i = 0; i < 100; i++)
{
    Stopwatch blockedTime = Stopwatch.StartNew();
    await DoSomething.SendAsync(i).ConfigureAwait(false);
    blockedTime.Stop();
    Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}

DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();

结果显示第50-54条消息全部被区块接收。消息 51-54 已完成,然后控制台 window 在显示消息 50 已完成且消息 55 能够被块接收之前 10 秒内没有显示任何输出。

...
Submitted 50    Blocked for 0ms.
Submitted 51    Blocked for 0ms.
processed message 51
Submitted 52    Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53    Blocked for 0ms.
Submitted 54    Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50 
processed message 55
Submitted 55    Blocked for 9998ms.
...

为什么Transform Block不继续填充block直到Bounded Capacity为5,而使用另外4个并行度继续处理消息?

ActionBlock 不会显示这些症状并继续处理其他可用并行线路上的消息。

无限容量 TransformBlock 也不会显示这些症状。

因为默认参数EnsureOrderedtrue,所以它试图保持结果的顺序。也就是说,它无法继续处理超过 BoundedCapacity,因为它需要维持秩序,这是您在测试中看到的 背压

此外,ActionBlock 不会表现出这种行为,因为它不会输出到任何其他块(可以这么说,这是一个死胡同),因此没有 [= 的概念26=]排序,背压仅受限于边界容量并行度.

DataflowBlockOptions.EnsureOrdered Property

By default, dataflow blocks enforce ordering on the processing of messages. Setting EnsureOrdered to false tells a block that it may relax this ordering if it's able to do so. This can be beneficial if making a processed result immediately available is more important than maintaining the input-to-output ordering.

解决方法是删除已排序的要求

new ExecutionDataflowBlockOptions 
       { 
           BoundedCapacity = 5,
           MaxDegreeOfParallelism = 5 ,
           EnsureOrdered = false
       });