二分支数据流网络不完整

Two-branched data flow network does not complete

这个数据流网络有一个分叉,并产生正确的文本输出和正确的结果。为什么没有完成?

            // Connect multiple blocks
            // source -> convertToDouble -> multiply -> multiplyBuffer -> summation -> writeOut
            //                                                        |-> multiply2 -> writeListOut
            var source = new BufferBlock<List<int>>();
            var convertToDouble = new TransformBlock<List<int>, List<double>>((List<int> l) =>
            {
                return l.Select(_l => (double)_l).ToList();
            });
            source.LinkTo(convertToDouble);
            Func<List<double>, List<double>> multiplyFunc = (List<double> l) =>
            {
                return l.Select(_l => _l * 10.0).ToList();
            };
            var multiply = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            convertToDouble.LinkTo(multiply);
            var multiplyBuffer = new BroadcastBlock<List<double>>((List<double> l) =>
            {
                return l;
            });
            multiply.LinkTo(multiplyBuffer);
            var summation = new TransformBlock<List<double>, double>((List<double> l) =>
            {
                return l.Sum();
            });
            multiplyBuffer.LinkTo(summation);
            var writeOut = new ActionBlock<double>((double d) =>
            {
                Console.WriteLine("Writing out: " + d.ToString());
            });
            summation.LinkTo(writeOut);
            var multiply2 = new TransformBlock<List<double>, List<double>>(multiplyFunc);
            multiplyBuffer.LinkTo(multiply2);
            var writeListOut = new ActionBlock<List<double>>((List<double> l) =>
            {
                Console.WriteLine("Writing list out: " + string.Join(", ", l.Select(_l => 
                    _l.ToString()).ToList()));
            });
            multiply2.LinkTo(writeListOut);

            source.Post(new List<int> { 1, 2, 3 });

            Task.Run(async () =>
            {
                await Task.Delay(3000);
                Console.WriteLine("posting 2nd...");
                source.Post(new List<int> { 4, 5, 6 });
                source.Complete();
            });

            // Never completes
            try
            {
                writeOut.Completion.Wait();
                writeListOut.Completion.Wait();
            }
            catch (AggregateException ex)
            {
                ex.Handle(e =>
                {
                    Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
                    return true;
                });
            }

我注意到如果 Completion.wait() 调用被省略,那么程序 returns。执行网络时没有观察到错误。

示例输出:

写出列表:100, 200, 300 写出:60 发布第二... 写出:150 写出列表:400、500、600 (挂起)

预期输出:

写出列表:100, 200, 300 写出:60 发布第二... 写出:150 写出列表:400、500、600 (returns)

在 TPL 中,默认情况下不会将源完成传递给其他块。

您需要构造一个 System.Threading.Tasks.Dataflow.DataflowLinkOptions 并将其 PropagateCompletion 属性 设置为 true,然后将其传递到您对 LinkTo 的调用中。

或者,您可以按顺序对所有块调用 Complete 方法。