二分支数据流网络不完整
Two-branched data flow network does not complete
这个数据流网络有一个分叉,并产生正确的文本输出和正确的结果。为什么没有完成?
// Connect multiple blocks
// source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
// |-> multiply2 -> writeListOut
var source = new BufferBlock<List<int>>();
var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
{
return l.Select(_l => (double)_l).ToList();
});
source.LinkTo(convertToDouble);
Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
{
return l.Select(_l => _l * 10.0).ToList();
};
var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
convertToDouble.LinkTo(multiply);
var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
{
return l;
});
multiply.LinkTo(multiplyBuffer);
var summation = new TransformBlock<List<double>, double>((List<double> l) =>
{
return l.Sum();
});
multiplyBuffer.LinkTo(summation);
var writeOut = new ActionBlock<double>((double d) =>
{
Console.WriteLine("Writing out: " + d.ToString());
});
summation.LinkTo(writeOut);
var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
multiplyBuffer.LinkTo(multiply2);
var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
{
Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l =>
_l.ToString()).ToList()));
});
multiply2.LinkTo(writeListOut);
source.Post(new List<int> { 1, 2, 3 });
Task.Run(async () =>
{
await Task.Delay(3000);
Console.WriteLine("posting 2nd...");
source.Post(new List<int> { 4, 5, 6 });
source.Complete();
});
// Never completes
try
{
writeOut.Completion.Wait();
writeListOut.Completion.Wait();
}
catch (AggregateException ex)
{
ex.Handle(e =>
{
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
return true;
});
}
我注意到如果 Completion.wait()
调用被省略,那么程序 returns。执行网络时没有观察到错误。
示例输出:
写出列表:100, 200, 300
写出:60
发布第二...
写出:150
写出列表:400、500、600
(挂起)
预期输出:
写出列表:100, 200, 300
写出:60
发布第二...
写出:150
写出列表:400、500、600
(returns)
在 TPL 中,默认情况下不会将源完成传递给其他块。
您需要构造一个 System.Threading.Tasks.Dataflow.DataflowLinkOptions
并将其 PropagateCompletion
属性 设置为 true
,然后将其传递到您对 LinkTo
的调用中。
或者,您可以按顺序对所有块调用 Complete
方法。
这个数据流网络有一个分叉,并产生正确的文本输出和正确的结果。为什么没有完成?
// Connect multiple blocks
// source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
// |-> multiply2 -> writeListOut
var source = new BufferBlock<List<int>>();
var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
{
return l.Select(_l => (double)_l).ToList();
});
source.LinkTo(convertToDouble);
Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
{
return l.Select(_l => _l * 10.0).ToList();
};
var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
convertToDouble.LinkTo(multiply);
var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
{
return l;
});
multiply.LinkTo(multiplyBuffer);
var summation = new TransformBlock<List<double>, double>((List<double> l) =>
{
return l.Sum();
});
multiplyBuffer.LinkTo(summation);
var writeOut = new ActionBlock<double>((double d) =>
{
Console.WriteLine("Writing out: " + d.ToString());
});
summation.LinkTo(writeOut);
var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
multiplyBuffer.LinkTo(multiply2);
var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
{
Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l =>
_l.ToString()).ToList()));
});
multiply2.LinkTo(writeListOut);
source.Post(new List<int> { 1, 2, 3 });
Task.Run(async () =>
{
await Task.Delay(3000);
Console.WriteLine("posting 2nd...");
source.Post(new List<int> { 4, 5, 6 });
source.Complete();
});
// Never completes
try
{
writeOut.Completion.Wait();
writeListOut.Completion.Wait();
}
catch (AggregateException ex)
{
ex.Handle(e =>
{
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
return true;
});
}
我注意到如果 Completion.wait()
调用被省略,那么程序 returns。执行网络时没有观察到错误。
示例输出:
写出列表:100, 200, 300 写出:60 发布第二... 写出:150 写出列表:400、500、600 (挂起)
预期输出:
写出列表:100, 200, 300 写出:60 发布第二... 写出:150 写出列表:400、500、600 (returns)
在 TPL 中,默认情况下不会将源完成传递给其他块。
您需要构造一个 System.Threading.Tasks.Dataflow.DataflowLinkOptions
并将其 PropagateCompletion
属性 设置为 true
,然后将其传递到您对 LinkTo
的调用中。
或者,您可以按顺序对所有块调用 Complete
方法。