Transform Block with parallelism and bounded capacity 延迟消息行为
Transform Block with parallelism and bounded capacity postponing message behavior
当 TransformBlock
有一个 MaxDegreeOfParallelism > 1
和 BoundedCapacity
不是无限的,为什么它推迟接收更多的消息,尽管有一个长 运行 任务输入队列中有容量吗?
采用以下控制台应用程序。它创建一个带有 MaxDegreeOfParallelism = 5
和 BoundedCapacity = 5
的 TransformBlock,然后向它提供 100 条消息。当块处理消息 x == 50
时,它会延迟该任务 10 秒。
TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
if (x == 50)
{
Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
await Task.Delay(10000);
}
Console.WriteLine($"processed message {x}");
return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });
DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block
for (int i = 0; i < 100; i++)
{
Stopwatch blockedTime = Stopwatch.StartNew();
await DoSomething.SendAsync(i).ConfigureAwait(false);
blockedTime.Stop();
Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}
DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();
结果显示第50-54条消息全部被区块接收。消息 51-54 已完成,然后控制台 window 在显示消息 50 已完成且消息 55 能够被块接收之前 10 秒内没有显示任何输出。
...
Submitted 50 Blocked for 0ms.
Submitted 51 Blocked for 0ms.
processed message 51
Submitted 52 Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53 Blocked for 0ms.
Submitted 54 Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50
processed message 55
Submitted 55 Blocked for 9998ms.
...
为什么Transform Block不继续填充block直到Bounded Capacity为5,而使用另外4个并行度继续处理消息?
ActionBlock
不会显示这些症状并继续处理其他可用并行线路上的消息。
无限容量 TransformBlock
也不会显示这些症状。
因为默认参数EnsureOrdered
是true
,所以它试图保持结果的顺序。也就是说,它无法继续处理超过 BoundedCapacity
,因为它需要维持秩序,这是您在测试中看到的 背压。
此外,ActionBlock
不会表现出这种行为,因为它不会输出到任何其他块(可以这么说,这是一个死胡同),因此没有 [= 的概念26=]排序,背压仅受限于边界容量和并行度.
DataflowBlockOptions.EnsureOrdered Property
By default, dataflow blocks enforce ordering on the processing of
messages. Setting EnsureOrdered
to false tells a block that it may relax this ordering if it's able to do so.
This can be beneficial if making a processed result immediately
available is more important than maintaining the input-to-output
ordering.
解决方法是删除已排序的要求
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = 5 ,
EnsureOrdered = false
});
当 TransformBlock
有一个 MaxDegreeOfParallelism > 1
和 BoundedCapacity
不是无限的,为什么它推迟接收更多的消息,尽管有一个长 运行 任务输入队列中有容量吗?
采用以下控制台应用程序。它创建一个带有 MaxDegreeOfParallelism = 5
和 BoundedCapacity = 5
的 TransformBlock,然后向它提供 100 条消息。当块处理消息 x == 50
时,它会延迟该任务 10 秒。
TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
if (x == 50)
{
Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
await Task.Delay(10000);
}
Console.WriteLine($"processed message {x}");
return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });
DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block
for (int i = 0; i < 100; i++)
{
Stopwatch blockedTime = Stopwatch.StartNew();
await DoSomething.SendAsync(i).ConfigureAwait(false);
blockedTime.Stop();
Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}
DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();
结果显示第50-54条消息全部被区块接收。消息 51-54 已完成,然后控制台 window 在显示消息 50 已完成且消息 55 能够被块接收之前 10 秒内没有显示任何输出。
...
Submitted 50 Blocked for 0ms.
Submitted 51 Blocked for 0ms.
processed message 51
Submitted 52 Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53 Blocked for 0ms.
Submitted 54 Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50
processed message 55
Submitted 55 Blocked for 9998ms.
...
为什么Transform Block不继续填充block直到Bounded Capacity为5,而使用另外4个并行度继续处理消息?
ActionBlock
不会显示这些症状并继续处理其他可用并行线路上的消息。
无限容量 TransformBlock
也不会显示这些症状。
因为默认参数EnsureOrdered
是true
,所以它试图保持结果的顺序。也就是说,它无法继续处理超过 BoundedCapacity
,因为它需要维持秩序,这是您在测试中看到的 背压。
此外,ActionBlock
不会表现出这种行为,因为它不会输出到任何其他块(可以这么说,这是一个死胡同),因此没有 [= 的概念26=]排序,背压仅受限于边界容量和并行度.
DataflowBlockOptions.EnsureOrdered Property
By default, dataflow blocks enforce ordering on the processing of messages. Setting
EnsureOrdered
to false tells a block that it may relax this ordering if it's able to do so. This can be beneficial if making a processed result immediately available is more important than maintaining the input-to-output ordering.
解决方法是删除已排序的要求
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = 5 ,
EnsureOrdered = false
});