为什么块 运行 按此顺序排列?

Why do blocks run in this order?

这是简短的代码示例,可以快速向您介绍我的问题:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var secondBlock = new TransformBlock<int,string>(async x =>
            {
                if (x == 12)
                {
                    await Task.Delay(5000);
                    return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
                }

                return $"{DateTime.Now}: Message is {x}";
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);

            var populateTask = Task.Run(async () =>
            {
                foreach (var x in Enumerable.Range(1, 15))
                {
                    await firstBlock.SendAsync(x);
                }
            });

            populateTask.Wait();
            secondBlock.Completion.Wait();
        }
    }
}

输出为:

09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14

为什么是这个顺序,我怎样才能改变网络以获得下面的输出?

09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)

所以我想知道为什么所有其他块(或这里的任务)都要等待延迟块?


更新

由于你们让我更详细地解释我的问题,所以我制作了这个示例,它更接近我正在处理的真实管道。假设应用程序下载一些数据并根据返回的响应计算哈希。

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

我们来看看请求的顺序:

这绝对有道理。所有请求都会尽快提出。慢的第四个请求在列表末尾。

现在让我们看看我们有什么输出:

09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3

你可以看到第三个之后的所有哈希值都是在第四个响应出现后立即计算的。

因此,基于这两个事实,我们可以说所有下载的页面都在等待慢速第四个请求完成。最好不要等待第四个请求并在下载数据后立即计算哈希值。有什么办法可以实现吗?

查看时间戳,第二个块的输出按您预期的那样工作 - 延迟的 TransformBlock 在所有其他 TransformBlock 之后 运行。似乎是 ActionBlock 中的 Console.WriteLine 未按您期望的顺序调用。

您的代码是否 secondBlock.Completion.Wait(); 不正确 - 是否应该 thirdBlock.Completion.Wait(); 才能获得您期望的结果?

这是设计使然 documented

Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, ...

证明:

var ts = Environment.TickCount;

var firstBlock = new TransformBlock<int, int>(
    x => x,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
    } );

var secondBlock = new TransformBlock<int, string>(
    x =>
    {
        var start = Environment.TickCount;

        if ( x == 3 )
        {
            Thread.Sleep( 5000 );
            return $"Start {start-ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }

        return $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
        // limit the internal queue to 10 items
        BoundedCapacity = 10,
    } );

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 1,
    } );

firstBlock.LinkTo( secondBlock, new DataflowLinkOptions { PropagateCompletion = true, } );
secondBlock.LinkTo( thirdBlock, new DataflowLinkOptions { PropagateCompletion = true, } );

foreach ( var x in Enumerable.Range( 1, 15 ) )
{
    // to ensure order of items
    firstBlock.Post( x );
}

firstBlock.Complete();
thirdBlock.Completion.Wait();

输出:

Start 31 Finished 31: Message is 1
Start 31 Finished 31: Message is 2
Start 31 Finished 5031: Message is 3 (This is delayed message!)
Start 31 Finished 31: Message is 4
Start 31 Finished 31: Message is 5
Start 31 Finished 31: Message is 6
Start 31 Finished 31: Message is 7
Start 31 Finished 31: Message is 8
Start 31 Finished 31: Message is 9
Start 31 Finished 31: Message is 10
Start 31 Finished 31: Message is 11
Start 31 Finished 31: Message is 12
Start 5031 Finished 5031: Message is 13
Start 5031 Finished 5031: Message is 14
Start 5031 Finished 5031: Message is 15

解决方案 1

不要在下载部分使用 DataFlow,因为订单保证会阻止您正在寻找的处理。

var ts = Environment.TickCount;

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 4,
    } );

Parallel.ForEach(
    Enumerable.Range( 1, 15 ),
    new ParallelOptions { MaxDegreeOfParallelism = 4, },
    x =>
    {
        var start = Environment.TickCount;
        string result;

        if ( x == 12 )
        {
            Thread.Sleep( 5000 );
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }
        else
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
        thirdBlock.Post( result );
    } );

thirdBlock.Complete();
thirdBlock.Completion.Wait();

输出:

Start 32 Finished 32: Message is 2
Start 32 Finished 32: Message is 6
Start 32 Finished 32: Message is 5
Start 32 Finished 32: Message is 8
Start 32 Finished 32: Message is 9
Start 32 Finished 32: Message is 10
Start 32 Finished 32: Message is 11
Start 32 Finished 32: Message is 7
Start 32 Finished 32: Message is 13
Start 32 Finished 32: Message is 14
Start 32 Finished 32: Message is 15
Start 32 Finished 32: Message is 3
Start 32 Finished 32: Message is 4
Start 32 Finished 32: Message is 1
Start 32 Finished 5032: Message is 12 (This is delayed message!)

解决方案 2

当然,您可以在自定义 class 中实现 IPropagatorBlock<TInput,TOutput>,但 保证项目的顺序。

好的,参考@SirRufo,我开始考虑实施我自己的 TransformBlock 来满足我的需要并处理传入的项目而不考虑订购。这样它就不会破坏网络,在部分下载过程中在块之间建立间隙,这将是一种优雅的方式。

所以我开始研究我可以做什么以及如何做。查看 TransformBlock 的来源本身似乎是一个很好的起点,所以我在 Github 上打开 TransformBlock 来源并开始分析它。

从 class 开始我就发现了一件有趣的事: // 如果采用并行机制,我们将需要支持对乱序完成的消息进行重新排序。

// However, a developer can override this with EnsureOrdered == false.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
    _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}

看起来正是我们想要的!让我们在 Github:

上的 DataflowBlockOptions class 中查看此 EnsureOrdered 选项
/// <summary>Gets or sets whether ordered processing should be enforced on a block's handling of messages.</summary>
/// <remarks>
/// By default, dataflow blocks enforce ordering on the processing of messages. This means that a
/// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same
/// order they were input, even if parallelism is employed by the block and the processing of a message N finishes 
/// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input
/// ordering prior to making those results available to a consumer).  Some blocks may allow this to be relaxed,
/// however.  Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if
/// it's able to do so.  This can be beneficial if the immediacy of a processed result being made available
/// is more important than the input-to-output ordering being maintained.
/// </remarks>
public bool EnsureOrdered
{
    get { return _ensureOrdered; }
    set { _ensureOrdered = value; }
}

它看起来非常好,所以我立即切换到 IDE 进行设置。不幸的是,没有这样的设置:

我一直在搜索,找到了这个 note:

4.5.25-beta-23019

Package has been renamed to System.Threading.Tasks.Dataflow

当我用 Google 搜索并找到这个 package 时,它叫做 System.Threading.Tasks.Dataflow!所以我卸载了 Microsoft.Tpl.Dataflow 软件包并通过发出以下命令安装了 System.Threading.Tasks.Dataflow

Install-Package System.Threading.Tasks.Dataflow

还有 EnsureOrdered 选项。我通过将 EnsureOrdered 设置为 false:

来更新代码
using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false };
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), options);

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, options);

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, options);

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, options);

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

结果输出正是我想要的:

10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA