链接的 ActionBlock 的 BoundedCapacity 不受尊重
BoundedCapacity of linked ActionBlock is not respected
我有一个包含两个步骤的顺序管道。
(简化示例)
第一步只是将输入的数字加 1000。
第二步简单显示数字。
var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = DataflowBlockOptions.Unbounded,
});
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2,
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
for (int i = 0; i < 100; i++)
{
transformBlock.Post(i);
}
static async Task<long> StepOne(int item)
{
await Task.Delay(500);
Console.WriteLine("transforming: " + item);
return (long)item + 1000;
}
static async Task StepTwo(long item)
{
await Task.Delay(1000);
Console.WriteLine("final product: " + item);
}
由于第 2 步比第 1 步花费的时间更长,我预计第 1 步会在一段时间后进行限制,因为它无法将结果发送到第 2 步的有界缓冲区。
预期输出:
转换:0
转换:1
最终产品:1000
转换:2
最终产品:1001
转换:3
最终产品:1002
转换:4
最终产品:1003
...
实际输出:
转换:0
转换:1
最终产品:1000
转换:2
转换:3
最终产品:1001
转换:4
变换:5
最终产品:1002
转换:6
转换:7
最终产品:1003
...
一个TransformBlock
内部维护了两个队列,一个输入队列和一个输出队列。这两个队列的大小可以通过 InputCount
and OutputCount
properties. The accumulated size of these two queues is configured by the BoundedCapacity
option, so the sum InputCount
+OutputCount
is always less than or equal to the BoundedCapacity
value. In your case the BoundedCapacity
of the block is Unbounded
随时监控,因此这两个队列的大小没有限制因素(除了一些硬限制,例如 Int32.MaxValue
可能)。链接的 ActionBlock
具有有限的有界容量这一事实在很大程度上是无关紧要的,除了延迟将转换后的值从 TransformBlock
的输出队列传输到 ActionBlock
。只有监视源块的 OutputCount
属性 和目标块的 InputCount
属性 时,才能观察到这种结果。 TransformBlock
是否未链接到任何目标块甚至都没有关系。它会很高兴地继续自己处理数字,直到达到某个硬限制,或者机器的内存耗尽。
我有一个包含两个步骤的顺序管道。
(简化示例)
第一步只是将输入的数字加 1000。 第二步简单显示数字。
var transformBlock = new TransformBlock<int, long>(StepOne, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = DataflowBlockOptions.Unbounded,
});
var actionBlock = new ActionBlock<long>(StepTwo, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2,
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
for (int i = 0; i < 100; i++)
{
transformBlock.Post(i);
}
static async Task<long> StepOne(int item)
{
await Task.Delay(500);
Console.WriteLine("transforming: " + item);
return (long)item + 1000;
}
static async Task StepTwo(long item)
{
await Task.Delay(1000);
Console.WriteLine("final product: " + item);
}
由于第 2 步比第 1 步花费的时间更长,我预计第 1 步会在一段时间后进行限制,因为它无法将结果发送到第 2 步的有界缓冲区。
预期输出:
转换:0
转换:1
最终产品:1000
转换:2
最终产品:1001
转换:3
最终产品:1002
转换:4
最终产品:1003
...
实际输出:
转换:0
转换:1
最终产品:1000
转换:2
转换:3
最终产品:1001
转换:4
变换:5
最终产品:1002
转换:6
转换:7
最终产品:1003
...
一个TransformBlock
内部维护了两个队列,一个输入队列和一个输出队列。这两个队列的大小可以通过 InputCount
and OutputCount
properties. The accumulated size of these two queues is configured by the BoundedCapacity
option, so the sum InputCount
+OutputCount
is always less than or equal to the BoundedCapacity
value. In your case the BoundedCapacity
of the block is Unbounded
随时监控,因此这两个队列的大小没有限制因素(除了一些硬限制,例如 Int32.MaxValue
可能)。链接的 ActionBlock
具有有限的有界容量这一事实在很大程度上是无关紧要的,除了延迟将转换后的值从 TransformBlock
的输出队列传输到 ActionBlock
。只有监视源块的 OutputCount
属性 和目标块的 InputCount
属性 时,才能观察到这种结果。 TransformBlock
是否未链接到任何目标块甚至都没有关系。它会很高兴地继续自己处理数字,直到达到某个硬限制,或者机器的内存耗尽。