多对多 TPL 数据流不处理所有输入
Many to Many TPL Dataflow does not process all inputs
我有一个 TPL Datalow 管道,其中包含以多对多方式链接的两个源和两个目标。目标块似乎成功完成,但是,它通常会丢弃一个或多个输入。我在下面附上了我能想到的最简单的完整复制品。有什么想法吗?
备注:
- 只有在生成输入时使用人为延迟才会出现此问题。
- 两个源都成功调用了 Complete(),但其中一个源的完成任务挂在 WaitingForActivation 状态,即使两个目标都已成功完成。
- 我找不到任何说明不支持多对多数据流的文档,这个问题的答案暗示它是 - https://social.msdn.microsoft.com/Forums/en-US/19d831af-2d3f-4d95-9672-b28ae53e6fa0/completion-of-complex-graph-dataflowgraph-object?forum=tpldataflow
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
private const int NumbersPerSource = 10;
private const int MaxDelayMilliseconds = 10;
static async Task Main(string[] args)
{
int numbersProcessed = 0;
var source1 = new BufferBlock<int>();
var source2 = new BufferBlock<int>();
var target1 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
var target2 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
source1.LinkTo(target1, linkOptions);
source1.LinkTo(target2, linkOptions);
source2.LinkTo(target1, linkOptions);
source2.LinkTo(target2, linkOptions);
var task1 = Task.Run(() => Post(source1));
var task2 = Task.Run(() => Post(source2));
// source1 or source2 Completion tasks may never complete even though Complete is always successfully called.
//await Task.WhenAll(task1, task2, source1.Completion, source2.Completion, target1.Completion, target2.Completion);
await Task.WhenAll(task1, task2, target1.Completion, target2.Completion);
Console.WriteLine($"{numbersProcessed} of {NumbersPerSource * 2} numbers processed.");
}
private static async Task Post(BufferBlock<int> source)
{
foreach (var i in Enumerable.Range(0, NumbersPerSource)) {
await Task.Delay(TimeSpan.FromMilliseconds(GetRandomMilliseconds()));
Debug.Assert(source.Post(i));
}
source.Complete();
}
private static Random Random = new Random();
private static int GetRandomMilliseconds()
{
lock (Random) {
return Random.Next(0, MaxDelayMilliseconds);
}
}
}
正如@MikeJ 在 中指出的那样,在 many-to-many 数据流配置中使用 PropagateCompletion
链接块可能会导致某些目标块过早完成。在这种情况下,当两个源块中的任何一个完成时,target1
和 target2
都被标记为已完成,而另一个源无法完成,因为它的输出缓冲区中仍有消息。这些消息永远不会被消耗,因为 none 个链接的目标块愿意接受它们。
要解决此问题,您可以使用下面的自定义 PropagateCompletion
方法:
public static async void PropagateCompletion(IDataflowBlock[] sources,
IDataflowBlock[] targets)
{
// Arguments validation omitted
Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));
try { await allSourcesCompletion.ConfigureAwait(false); } catch { }
var exception = allSourcesCompletion.IsFaulted ?
allSourcesCompletion.Exception : null;
foreach (var target in targets)
{
if (exception != null) target.Fault(exception); else target.Complete();
}
}
用法示例:
source1.LinkTo(target1);
source1.LinkTo(target2);
source2.LinkTo(target1);
source2.LinkTo(target2);
PropagateCompletion(new[] { source1, source2 }, new[] { target1, target2 });
请注意,在此示例中将源链接到目标时没有传递 DataflowLinkOptions
。
我有一个 TPL Datalow 管道,其中包含以多对多方式链接的两个源和两个目标。目标块似乎成功完成,但是,它通常会丢弃一个或多个输入。我在下面附上了我能想到的最简单的完整复制品。有什么想法吗?
备注:
- 只有在生成输入时使用人为延迟才会出现此问题。
- 两个源都成功调用了 Complete(),但其中一个源的完成任务挂在 WaitingForActivation 状态,即使两个目标都已成功完成。
- 我找不到任何说明不支持多对多数据流的文档,这个问题的答案暗示它是 - https://social.msdn.microsoft.com/Forums/en-US/19d831af-2d3f-4d95-9672-b28ae53e6fa0/completion-of-complex-graph-dataflowgraph-object?forum=tpldataflow
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
private const int NumbersPerSource = 10;
private const int MaxDelayMilliseconds = 10;
static async Task Main(string[] args)
{
int numbersProcessed = 0;
var source1 = new BufferBlock<int>();
var source2 = new BufferBlock<int>();
var target1 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
var target2 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
source1.LinkTo(target1, linkOptions);
source1.LinkTo(target2, linkOptions);
source2.LinkTo(target1, linkOptions);
source2.LinkTo(target2, linkOptions);
var task1 = Task.Run(() => Post(source1));
var task2 = Task.Run(() => Post(source2));
// source1 or source2 Completion tasks may never complete even though Complete is always successfully called.
//await Task.WhenAll(task1, task2, source1.Completion, source2.Completion, target1.Completion, target2.Completion);
await Task.WhenAll(task1, task2, target1.Completion, target2.Completion);
Console.WriteLine($"{numbersProcessed} of {NumbersPerSource * 2} numbers processed.");
}
private static async Task Post(BufferBlock<int> source)
{
foreach (var i in Enumerable.Range(0, NumbersPerSource)) {
await Task.Delay(TimeSpan.FromMilliseconds(GetRandomMilliseconds()));
Debug.Assert(source.Post(i));
}
source.Complete();
}
private static Random Random = new Random();
private static int GetRandomMilliseconds()
{
lock (Random) {
return Random.Next(0, MaxDelayMilliseconds);
}
}
}
正如@MikeJ 在 PropagateCompletion
链接块可能会导致某些目标块过早完成。在这种情况下,当两个源块中的任何一个完成时,target1
和 target2
都被标记为已完成,而另一个源无法完成,因为它的输出缓冲区中仍有消息。这些消息永远不会被消耗,因为 none 个链接的目标块愿意接受它们。
要解决此问题,您可以使用下面的自定义 PropagateCompletion
方法:
public static async void PropagateCompletion(IDataflowBlock[] sources,
IDataflowBlock[] targets)
{
// Arguments validation omitted
Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));
try { await allSourcesCompletion.ConfigureAwait(false); } catch { }
var exception = allSourcesCompletion.IsFaulted ?
allSourcesCompletion.Exception : null;
foreach (var target in targets)
{
if (exception != null) target.Fault(exception); else target.Complete();
}
}
用法示例:
source1.LinkTo(target1);
source1.LinkTo(target2);
source2.LinkTo(target1);
source2.LinkTo(target2);
PropagateCompletion(new[] { source1, source2 }, new[] { target1, target2 });
请注意,在此示例中将源链接到目标时没有传递 DataflowLinkOptions
。