保证TransformBlock输出顺序

Guarantee TransformBlock output sequence

来自 TPL 文档

As with ActionBlock<TInput>, TransformBlock<TInput,TOutput> defaults to processing one message at a time, maintaining strict FIFO ordering.

然而,在多线程场景中,即如果多个线程正在 "simultaneously" 执行 SendAsync 然后通过调用 ReceiveAsync 获得结果 "awaiting",如何我们保证将内容发布到 TransformBlock<TInput,TOutput> 的线程实际上得到了它正在等待的预期结果?

在我的实验中,似乎 "guarantee" 我想要的结果的方法是添加选项 BoundedCapacity = 1。至少线程在发送和接收时仍然没有被阻塞。

如果我不这样做,某些线程将收到用于另一个线程的结果。

在这个特定用例中这是正确的方法吗?

这里有一些代码可以说明我的担忧:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleTransformBlock
{
    class Program
    {
        private readonly static TransformBlock<int, int> _pipeline;

        static Program()
        {

            _pipeline = new TransformBlock<int, int>(async (input) =>
            {
                await Task.Delay(RandomGen2.Next(5, 100)).ConfigureAwait(false);
                return input;

            }, 
            new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); // this is the fix???
        }


        static void Main(string[] args)
        {
            var dop = System.Environment.ProcessorCount;// 8-core


            Parallel.For(0, dop, new ParallelOptions() { MaxDegreeOfParallelism = dop },
                 (d) =>
                 {
                     DoStuff().Wait();
                 });

            Console.WriteLine("Parallel For Done ...");
            var tasks = new Task[dop];
            for (var i = 0; i < dop; i++)
            {
             var temp = i;
             tasks[temp] = Task.Factory.StartNew
                (async () => await DoStuff().ConfigureAwait(false),
                CancellationToken.None,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default).Unwrap();
            }

            Task.WaitAll(tasks);


        }

        private static async Task DoStuff()
        {
            for (var i = 0; i < 100; i++)
            {
                var temp = RandomGen2.Next();
                await _pipeline.SendAsync(temp).ConfigureAwait(false);
                Console.WriteLine("Just sent {0}, now waiting {1}...", new object[] { temp, System.Threading.Thread.CurrentThread.ManagedThreadId });
                await Task.Delay(RandomGen2.Next(5, 50)).ConfigureAwait(false);
                var result = await _pipeline.ReceiveAsync().ConfigureAwait(false);
                Console.WriteLine("Received {0}... {1}", new object[] { result, System.Threading.Thread.CurrentThread.ManagedThreadId });

                if (result != temp)
                {
                    var error = string.Format("************** Sent {0} But Received {1}", temp, result, System.Threading.Thread.CurrentThread.ManagedThreadId);
                    Console.WriteLine(error);
                    break;

                }

            }
        }

        /// <summary>
        /// Thread-Safe Random Generator
        /// </summary>
        public static class RandomGen2
        {
            private static Random _global = new Random();
            [ThreadStatic]
            private static Random _local;

            public static int Next()
            {
                return Next(0, int.MaxValue);
            }
            public static int Next(int max)
            {
                return Next(0, max);
            }
            public static int Next(int min, int max)
            {
                Random inst = _local;
                if (inst == null)
                {
                    int seed;
                    lock (_global) seed = _global.Next();
                    _local = inst = new Random(seed);
                }
                return inst.Next(min, max);
            }
        }
    }
}

TransformBlock 已经保持 FIFO 顺序。您 post 项目到块的顺序是项目将从块返回的确切顺序。

When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.

来自Dataflow (Task Parallel Library)

你可以通过这个例子看到:

private static async Task MainAsync()
{
    var transformBlock = new TransformBlock<int, int>(async input =>
    {
        await Task.Delay(RandomGen2.Next(5, 100));
        return input;
    }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10});

    foreach (var number in Enumerable.Range(0,100))
    {
        await transformBlock.SendAsync(number);
    }

    for (int i = 0; i < 100; i++)
    {
        var result = await transformBlock.ReceiveAsync();
        Console.WriteLine(result);
    }
}

其中顺序会按0-99排序。

然而,您似乎想要的是与线程的某种关联,因此线程将 post 一个项目添加到块中,然后接收其结果。这并不真正适合 TPL 数据流,它应该更像是一个块管道。你可以用 BoundedCapacity = 1 破解它,但你可能不应该这样做。