TPL 数据流转换块 post 到批处理块,然后是操作块

TPL Dataflow Transform block post to batch block followed by actionblock

我有一个基于 TPL 数据流的应用程序,它仅使用批处理块和操作块就可以正常工作。

我已经添加了一个 TransformBlock 以尝试在发布到批处理块之前从源中转换数据,但我的操作块从未被击中。没有错误或异常被抛出。

我不确定是否需要完成我的变换块,因为它似乎只被击中一次。

除了返回输出类型的对象之外,我是否需要在我的转换代码中添加一个步骤?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        public const int BATCH_SIZE = 10;
        static void Main(string[] args)
        {
            Console.WriteLine("Application started");

            //Create the pipeline of actions
            var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
            var batchBlock = new BatchBlock<string>(BATCH_SIZE);
            var uploadFilesToAzureBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

            Console.WriteLine("Blocks created");

            //link the actions
            transformBlock.LinkTo(batchBlock);
            batchBlock.LinkTo(uploadFilesToAzureBlock);
            batchBlock.Completion.ContinueWith(obj => uploadFilesToAzureBlock.Complete());

            Console.WriteLine("Blocks linked");

            var testInputs = new List<string>
            {
                "Kyle",
                "Stephen",
                "Jon",
                "Conor",
                "Adrian",
                "Marty",
                "Richard",
                "Norbert",
                "Kerri",
                "Mark",
                "Declan",
                "Ray",
                "Paul",
                "Andrew",
                "Rachel",
                "David",
                "Darrell"
            };

            Console.WriteLine("Data created");

            var i = 0;
            foreach (var name in testInputs)
            {
                Console.WriteLine("Posting name {0}", i);
                transformBlock.Post(name);
                i++;
            }

            batchBlock.Complete();
            uploadFilesToAzureBlock.Completion.Wait();

            Console.WriteLine("Finishing");
            Console.ReadKey();
        }

        private static void OutputStrings(IEnumerable<string> strings)
        {
            Console.WriteLine("Beginning Batch...");

            foreach (var s in strings)
            {
                Console.WriteLine(s);
            }

            Console.WriteLine("Completing Batch...");
        }

        private static string TransformString(string input)
        {
            return input += " has been processed";
        }
    }
}

正如上面"usr"提到的,我没有传播块的完成。以下代码完美运行。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        public const int BATCH_SIZE = 10;
        static void Main(string[] args)
        {
            Console.WriteLine("Application started");

            //Create the pipeline of actions
            var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
            var batchBlock = new BatchBlock<string>(BATCH_SIZE);
            var outputStringsBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

            Console.WriteLine("Blocks created");

            //link the actions
            transformBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
            batchBlock.LinkTo(outputStringsBlock, new DataflowLinkOptions { PropagateCompletion = true });
            batchBlock.Completion.ContinueWith(obj => outputStringsBlock.Complete());

            Console.WriteLine("Blocks linked");

            var testInputs = new List<string>
            {
                "Kyle",
                "Stephen",
                "Jon",
                "Conor",
                "Adrian",
                "Marty",
                "Richard",
                "Norbert",
                "Kerri",
                "Mark",
                "Declan",
                "Ray",
                "Paul",
                "Andrew",
                "Rachel",
                "David",
                "Darrell"
            };

            Console.WriteLine("Data created");

            var i = 0;
            foreach (var name in testInputs)
            {
                Console.WriteLine("Posting name {0}", i);
                transformBlock.Post(name);
                i++;
            }

            transformBlock.Complete();
            outputStringsBlock.Completion.Wait();

            Console.WriteLine("Finishing");
            Console.ReadKey();
        }

        private static void OutputStrings(IEnumerable<string> strings)
        {
            Console.WriteLine("Beginning Batch...");
            Console.WriteLine("");

            foreach (var s in strings)
            {
                Console.WriteLine(s);
            }

            Console.WriteLine("");
            Console.WriteLine("Completing Batch...");
        }

        private static string TransformString(string input)
        {
            return input += " has been processed";
        }
    }
}