TPL 数据流 - 并行和异步处理,同时保持顺序

TPL Dataflow - Parallel&Async processing, while keeping order

我创建了一个 TPL 数据流管道,它由 3 个 TransformBlock 和最后一个 ActionBlock 组成。

var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

每个块都会用处理过的数据填充作业对象,因为我不仅需要数据本身,还需要一般信息,我需要这些信息来响应消息。如果一切顺利,我几乎添加了一个指向 XML 的路径,并获得一个包含信息的响应对象。

如果有两个或多个文件需要一些时间从 HDD 读取,我该如何实现,它会并行和异步读取这两个文件,同时保持它们进来的顺序?如果文件 1 需要更多时间,文件 2 需要等待文件 1 完成,然后我将数据传递到下一个块,然后它也将开始并行和异步验证数据,但这里也保持下一个块的顺序?

现在看来,即使我将 SendAsync 调用到 headblock,它也会按顺序处理所有文件。

编辑: 所以我为了管道的目的写了一个小测试 class。它有3个阶段。我想要实现的是第一个在文件进入时继续读取文件的 TransformBlock(来自 FileSystemWatcher 的 SendAsync)并在完成后按文件进入的顺序输出它。意味着如果 File1 是一个大文件并且 File2+3 进来,两者将被读入,而 File1 仍在处理中,但 File2+3 必须等到它可以发送到第二个 TransformBlock,因为 File1 仍在被读入。Stage2 应该工作相同。另一方面,Stage3 需要获取从 File1 生成的对象并保存到数据库中,这可以并行和异步完成。但是,file1 中的对象需要在 file2 和 file3 之前处理。因此,整个文件内容需要按顺序处理,以便它们进入。我尝试通过将 MaxDegreeOfParallelismBoundedCapacity 都设置为 1 来限制第三个 TransformBlock 来做到这一点,但这似乎失败了并没有真正保持 Console.WriteLine 的

中的顺序
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing
{
    public class Job
    {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test
    {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job)
        {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job)
        {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job)
        {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO =>
            {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o)
        {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job)
        {
            if(job.ReturnCode == 100)
            {
                Console.WriteLine("ID {0} was successfully imported.", job.ID);

            }
            else
            {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline()
        {
            var loadXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ReadDocument(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ValidateXml(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ProcessJob(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, ActionBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            importJob.LinkTo(reportImport);

            // Return the head of the network.
            return loadXml;
        }

        public void Start()
        {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id)
        {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel()
        {
            if(cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program
    {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args)
        {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach(var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

EDIT2: 在我通过将 BoundedCapacity 设置为 Unbounded 进行更改后,我按照发送到管道中的顺序获得了所有内容。所以它之前并没有真正出问题,但我猜消息被丢弃了?

如果我确定 EnsureOrdered 是正确的,并且在最后 TransformBlock 中使用 MaxDegreeOfParallelism of 8,如果您检查下面输出。但这是它需要按顺序排列的地方,因为我将数据保存到数据库中,这需要按照它进来的顺序。如果它在离开最后一个 TransformBlock 时不按顺序排列并不重要,所以我想我不能在这里保持并行性?

ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14

编辑 3: 使用@JSteward 最新代码后的输出。

ReadDocument 09:01:03.9363340 JobId: 1
ReadDocument 09:01:03.9368357 JobId: 5
ReadDocument 09:01:03.9373347 JobId: 6
ReadDocument 09:01:03.9368357 JobId: 8
ReadDocument 09:01:03.9363340 JobId: 4
ReadDocument 09:01:03.9373347 JobId: 3
ReadDocument 09:01:03.9373347 JobId: 7
ReadDocument 09:01:03.9368357 JobId: 2
ReadDocument 09:01:05.2037570 JobId: 9
ReadDocument 09:01:05.3108413 JobId: 10
ReadDocument 09:01:05.5678177 JobId: 11
ReadDocument 09:01:05.6308763 JobId: 12
ValidateXml 09:01:05.6338782 JobId: 1
ValidateXml 09:01:06.3754174 JobId: 2
ReadDocument 09:01:06.3764184 JobId: 13
ReadDocument 09:01:06.3764184 JobId: 14
ReadDocument 09:01:07.3756634 JobId: 15
ReadDocument 09:01:07.3756634 JobId: 18
ValidateXml 09:01:07.3756634 JobId: 3
ValidateXml 09:01:07.3756634 JobId: 4
ReadDocument 09:01:07.3756634 JobId: 17
ReadDocument 09:01:07.3756634 JobId: 16
ReadDocument 09:01:08.3753887 JobId: 19
ReadDocument 09:01:08.3753887 JobId: 20
ValidateXml 09:01:08.3753887 JobId: 5
ProcessJob 09:01:08.3763906 JobId: 1
ReadDocument 09:01:09.3744411 JobId: 21
ReadDocument 09:01:09.3749410 JobId: 24
ProcessJob 09:01:09.3749410 JobId: 2
ReadDocument 09:01:09.3749410 JobId: 22
ReadDocument 09:01:09.3749410 JobId: 23
ReadDocument 09:01:10.3752061 JobId: 25
ReadDocument 09:01:10.3752061 JobId: 27
ValidateXml 09:01:10.3752061 JobId: 6
ValidateXml 09:01:10.3752061 JobId: 7
ValidateXml 09:01:10.3752061 JobId: 8
ReadDocument 09:01:10.3752061 JobId: 26
ReadDocument 09:01:11.3759294 JobId: 29
ReadDocument 09:01:11.3759294 JobId: 28
ValidateXml 09:01:11.3764278 JobId: 10
ReadDocument 09:01:11.3759294 JobId: 31
ValidateXml 09:01:11.3759294 JobId: 9
ReadDocument 09:01:11.3759294 JobId: 30
ValidateXml 09:01:12.3751553 JobId: 11
ReadDocument 09:01:12.3751553 JobId: 33
ValidateXml 09:01:12.3751553 JobId: 12
ReadDocument 09:01:12.3751553 JobId: 34
ReadDocument 09:01:12.3751553 JobId: 32
ValidateXml 09:01:13.3753842 JobId: 13
ValidateXml 09:01:13.3753842 JobId: 14
ValidateXml 09:01:13.3753842 JobId: 16
ReadDocument 09:01:13.3753842 JobId: 35
ReadDocument 09:01:13.3753842 JobId: 36
ValidateXml 09:01:13.3753842 JobId: 15
ReadDocument 09:01:14.3756414 JobId: 37
ValidateXml 09:01:14.3756414 JobId: 19
ValidateXml 09:01:14.3756414 JobId: 18
ValidateXml 09:01:14.3756414 JobId: 17
ReadDocument 09:01:14.3756414 JobId: 40
ReadDocument 09:01:14.3756414 JobId: 38
ReadDocument 09:01:14.3756414 JobId: 39
ProcessJob 09:01:14.3761419 JobId: 3
SendToDataBase 09:01:14.3806453 JobId: 1
SendToDataBase 09:01:14.3821472 JobId: 2
ProcessJob 09:01:14.3821472 JobId: 4
ValidateXml 09:01:15.3763758 JobId: 20
ReadDocument 09:01:15.3763758 JobId: 42
ValidateXml 09:01:15.3763758 JobId: 21
ReadDocument 09:01:15.3773772 JobId: 43
ReadDocument 09:01:15.3763758 JobId: 41
ValidateXml 09:01:15.3768800 JobId: 22
ReadDocument 09:01:15.3773772 JobId: 44
ValidateXml 09:01:16.3761117 JobId: 23
ValidateXml 09:01:16.3761117 JobId: 26
ValidateXml 09:01:16.3761117 JobId: 24
ValidateXml 09:01:16.3761117 JobId: 25
ReadDocument 09:01:16.3761117 JobId: 45
ReadDocument 09:01:16.3761117 JobId: 46
ProcessJob 09:01:16.3761117 JobId: 5
ReadDocument 09:01:17.3758334 JobId: 47
ValidateXml 09:01:17.3763315 JobId: 28
ValidateXml 09:01:17.3763315 JobId: 27
ReadDocument 09:01:17.3763315 JobId: 49
ReadDocument 09:01:17.3763315 JobId: 48
ProcessJob 09:01:17.3763315 JobId: 6
ValidateXml 09:01:17.3763315 JobId: 29
ReadDocument 09:01:17.3763315 JobId: 50
ReadDocument 09:01:18.3755786 JobId: 51
ReadDocument 09:01:18.3755786 JobId: 52
<<<
ProcessJob 09:01:18.3770792 JobId: 10
ProcessJob 09:01:18.3770792 JobId: 9
ProcessJob 09:01:18.3755786 JobId: 7
>>>
ReadDocument 09:01:18.3755786 JobId: 53
ValidateXml 09:01:18.3755786 JobId: 32
ValidateXml 09:01:18.3755786 JobId: 31
ValidateXml 09:01:18.3755786 JobId: 30
ReadDocument 09:01:18.3760794 JobId: 54
ProcessJob 09:01:18.3755786 JobId: 8
ValidateXml 09:01:19.3753274 JobId: 34
ValidateXml 09:01:19.3753274 JobId: 33
ReadDocument 09:01:19.3758261 JobId: 56
ReadDocument 09:01:19.3758261 JobId: 55
ValidateXml 09:01:19.3758261 JobId: 35
ValidateXml 09:01:20.3752782 JobId: 36
ValidateXml 09:01:20.3752782 JobId: 37
ProcessJob 09:01:20.3757709 JobId: 11
ReadDocument 09:01:20.3752782 JobId: 57
ValidateXml 09:01:20.3752782 JobId: 38
ReadDocument 09:01:20.3757709 JobId: 58
ReadDocument 09:01:20.3757709 JobId: 59
ProcessJob 09:01:21.3757202 JobId: 12
ValidateXml 09:01:21.3757202 JobId: 39
ReadDocument 09:01:21.3757202 JobId: 62
ReadDocument 09:01:21.3757202 JobId: 61
ReadDocument 09:01:21.3757202 JobId: 60
ReadDocument 09:01:22.3764154 JobId: 63
ReadDocument 09:01:22.3764154 JobId: 64
ReadDocument 09:01:22.3764154 JobId: 65
ProcessJob 09:01:22.3794167 JobId: 16
ValidateXml 09:01:22.3764154 JobId: 40
ValidateXml 09:01:22.3764154 JobId: 42
ReadDocument 09:01:22.3764154 JobId: 66
ValidateXml 09:01:22.3774149 JobId: 43
ProcessJob 09:01:22.3764154 JobId: 13
ValidateXml 09:01:22.3764154 JobId: 41
ProcessJob 09:01:22.3779160 JobId: 15
SendToDataBase 09:01:22.3784159 JobId: 3
ProcessJob 09:01:22.3764154 JobId: 14
ValidateXml 09:01:22.3859209 JobId: 44
SendToDataBase 09:01:22.4309993 JobId: 4
SendToDataBase 09:01:22.4460051 JobId: 5
SendToDataBase 09:01:22.4465047 JobId: 6
ReadDocument 09:01:23.3760112 JobId: 67
ValidateXml 09:01:23.3760112 JobId: 46
ValidateXml 09:01:23.3760112 JobId: 47
ReadDocument 09:01:23.3760112 JobId: 68
ValidateXml 09:01:23.3760112 JobId: 45
ProcessJob 09:01:23.3760112 JobId: 17
ValidateXml 09:01:24.3762581 JobId: 48
ReadDocument 09:01:24.3762581 JobId: 69
ProcessJob 09:01:24.3762581 JobId: 18
ProcessJob 09:01:24.3762581 JobId: 19
ReadDocument 09:01:24.3762581 JobId: 70
CreateResponse 09:01:24.3777606 JobId: 58
CreateResponse 09:01:24.3994684 JobId: 59
CreateResponse 09:01:24.4059908 JobId: 60
CreateResponse 09:01:24.4114777 JobId: 61
CreateResponse 09:01:24.4134789 JobId: 62
ValidateXml 09:01:25.3759607 JobId: 49
ValidateXml 09:01:25.3759607 JobId: 51
ProcessJob 09:01:25.3784627 JobId: 22
ValidateXml 09:01:25.3759607 JobId: 52
ProcessJob 09:01:25.3759607 JobId: 20
ValidateXml 09:01:25.3774629 JobId: 53
ValidateXml 09:01:25.3759607 JobId: 50
ValidateXml 09:01:25.3774629 JobId: 54
ReadDocument 09:01:25.3759607 JobId: 72
ReadDocument 09:01:25.3774629 JobId: 73
ReadDocument 09:01:25.3759607 JobId: 71
ReadDocument 09:01:25.3779625 JobId: 74
ProcessJob 09:01:25.3759607 JobId: 21
SendToDataBase 09:01:25.3774629 JobId: 7
CreateResponse 09:01:25.3759607 JobId: 39
SendToDataBase 09:01:25.4398495 JobId: 8
SendToDataBase 09:01:25.4448555 JobId: 9
SendToDataBase 09:01:25.4478565 JobId: 10
SendToDataBase 09:01:25.4483570 JobId: 11
CreateResponse 09:01:25.4448555 JobId: 42
CreateResponse 09:01:25.4608868 JobId: 43
SendToDataBase 09:01:25.4553682 JobId: 12
CreateResponse 09:01:25.4613665 JobId: 44
CreateResponse 09:01:25.4698849 JobId: 45
ReadDocument 09:01:26.3754874 JobId: 75
ReadDocument 09:01:26.3754874 JobId: 76
ReadDocument 09:01:26.3754874 JobId: 78
ValidateXml 09:01:26.3754874 JobId: 55
ProcessJob 09:01:26.3759876 JobId: 24
ProcessJob 09:01:26.3754874 JobId: 23
ReadDocument 09:01:26.3754874 JobId: 77
SendToDataBase 09:01:26.3759876 JobId: 13
SendToDataBase 09:01:26.3980055 JobId: 14
SendToDataBase 09:01:26.3985045 JobId: 15
SendToDataBase 09:01:26.4020099 JobId: 16
ReadDocument 09:01:27.3762164 JobId: 79
ValidateXml 09:01:27.3762164 JobId: 56
ProcessJob 09:01:27.3762164 JobId: 26
ReadDocument 09:01:27.3762164 JobId: 82
ProcessJob 09:01:27.3762164 JobId: 25
ReadDocument 09:01:27.3762164 JobId: 81
ReadDocument 09:01:27.3762164 JobId: 80
ValidateXml 09:01:27.3762164 JobId: 63
ValidateXml 09:01:27.3777165 JobId: 64
ProcessJob 09:01:27.3767157 JobId: 27
ValidateXml 09:01:27.3762164 JobId: 57
SendToDataBase 09:01:27.3777165 JobId: 17
SendToDataBase 09:01:27.4327571 JobId: 18
SendToDataBase 09:01:27.4357587 JobId: 19
ReadDocument 09:01:28.3761410 JobId: 83
ProcessJob 09:01:28.3761410 JobId: 28
ProcessJob 09:01:28.3761410 JobId: 29
ValidateXml 09:01:28.3761410 JobId: 66
SendToDataBase 09:01:28.3761410 JobId: 20
ProcessJob 09:01:28.3761410 JobId: 30
ValidateXml 09:01:28.3761410 JobId: 67
ValidateXml 09:01:28.3761410 JobId: 65
SendToDataBase 09:01:28.3861483 JobId: 21
SendToDataBase 09:01:28.4141687 JobId: 22
ReadDocument 09:01:28.6079764 JobId: 84
ReadDocument 09:01:28.6552491 JobId: 85
ReadDocument 09:01:28.7047606 JobId: 86
ValidateXml 09:01:28.7327861 JobId: 68
ProcessJob 09:01:28.7327861 JobId: 31
ReadDocument 09:01:29.1285484 JobId: 87
ProcessJob 09:01:29.1894672 JobId: 32
SendToDataBase 09:01:29.1894672 JobId: 23
SendToDataBase 09:01:29.1944706 JobId: 24
ReadDocument 09:01:29.3910070 JobId: 88
ValidateXml 09:01:29.5569691 JobId: 69
ReadDocument 09:01:29.5995036 JobId: 89
ValidateXml 09:01:29.6085095 JobId: 70
ReadDocument 09:01:29.6581266 JobId: 90
ValidateXml 09:01:29.8797899 JobId: 71
ValidateXml 09:01:30.1244519 JobId: 72
ValidateXml 09:01:30.1584763 JobId: 73
ReadDocument 09:01:30.2100312 JobId: 91
ProcessJob 09:01:30.2490536 JobId: 33
ProcessJob 09:01:30.2950865 JobId: 34
ReadDocument 09:01:30.3290995 JobId: 92
ProcessJob 09:01:30.3636350 JobId: 35
SendToDataBase 09:01:30.3636350 JobId: 25
SendToDataBase 09:01:30.3701300 JobId: 26
SendToDataBase 09:01:30.3706299 JobId: 27
ProcessJob 09:01:30.4987430 JobId: 36
ReadDocument 09:01:30.5642707 JobId: 93
ReadDocument 09:01:30.6088035 JobId: 94
ValidateXml 09:01:30.7213868 JobId: 74
ReadDocument 09:01:30.7544106 JobId: 95
ProcessJob 09:01:30.7544106 JobId: 37
SendToDataBase 09:01:30.7544106 JobId: 28
ProcessJob 09:01:31.1091681 JobId: 38
SendToDataBase 09:01:31.1091681 JobId: 29
SendToDataBase 09:01:31.1151730 JobId: 30
ValidateXml 09:01:31.2012468 JobId: 75
ValidateXml 09:01:31.2827940 JobId: 76
ValidateXml 09:01:31.3143168 JobId: 77
ValidateXml 09:01:31.4073842 JobId: 78
ReadDocument 09:01:31.4369059 JobId: 96
ReadDocument 09:01:31.4699302 JobId: 97
ProcessJob 09:01:31.7201123 JobId: 40
SendToDataBase 09:01:31.7201123 JobId: 31
ProcessJob 09:01:32.1569310 JobId: 41
SendToDataBase 09:01:32.1569310 JobId: 32
ValidateXml 09:01:32.3650822 JobId: 79
ValidateXml 09:01:32.3650822 JobId: 80
ProcessJob 09:01:32.3966047 JobId: 46
ReadDocument 09:01:32.4236247 JobId: 98
ReadDocument 09:01:32.4831869 JobId: 99
ValidateXml 09:01:32.5607342 JobId: 81
ReadDocument 09:01:32.5777363 JobId: 100
ProcessJob 09:01:33.1461630 JobId: 47
ProcessJob 09:01:33.2081967 JobId: 48
SendToDataBase 09:01:33.2081967 JobId: 33
SendToDataBase 09:01:33.2137015 JobId: 34
SendToDataBase 09:01:33.2172021 JobId: 35
ValidateXml 09:01:33.2347146 JobId: 82
ValidateXml 09:01:33.4228519 JobId: 83
ProcessJob 09:01:33.4228519 JobId: 49
ValidateXml 09:01:33.4373638 JobId: 84
ProcessJob 09:01:33.4878995 JobId: 50
SendToDataBase 09:01:33.4878995 JobId: 36
ProcessJob 09:01:33.5819674 JobId: 51
ValidateXml 09:01:33.6239992 JobId: 85
ProcessJob 09:01:33.6239992 JobId: 52
SendToDataBase 09:01:33.6239992 JobId: 37
SendToDataBase 09:01:33.6295082 JobId: 38
ValidateXml 09:01:33.6870563 JobId: 86
ValidateXml 09:01:33.7125626 JobId: 87
ProcessJob 09:01:34.1238635 JobId: 53
ProcessJob 09:01:34.5796949 JobId: 54
<<<
SendToDataBase 09:01:34.5796949 JobId: 40
SendToDataBase 09:01:34.5856995 JobId: 41
SendToDataBase 09:01:34.5887008 JobId: 46
>>>
ValidateXml 09:01:34.7951688 JobId: 88
ValidateXml 09:01:34.9162007 JobId: 89
ProcessJob 09:01:34.9541705 JobId: 55
ValidateXml 09:01:35.0464443 JobId: 90
ProcessJob 09:01:35.3634898 JobId: 56
ProcessJob 09:01:35.3795024 JobId: 57
ValidateXml 09:01:35.5165095 JobId: 91
ValidateXml 09:01:35.8614345 JobId: 92
ProcessJob 09:01:35.9985415 JobId: 63
ValidateXml 09:01:36.0481807 JobId: 93
ProcessJob 09:01:36.0763064 JobId: 64
ProcessJob 09:01:36.0993229 JobId: 65
SendToDataBase 09:01:36.0993229 JobId: 47
SendToDataBase 09:01:36.1048270 JobId: 48
ValidateXml 09:01:36.1572079 JobId: 94
ValidateXml 09:01:36.3791015 JobId: 95
ProcessJob 09:01:36.4212607 JobId: 66
SendToDataBase 09:01:36.4212607 JobId: 49
SendToDataBase 09:01:36.4267655 JobId: 50
SendToDataBase 09:01:36.4272654 JobId: 51
SendToDataBase 09:01:36.4322913 JobId: 52
SendToDataBase 09:01:36.4327837 JobId: 53
ProcessJob 09:01:36.5149796 JobId: 67
SendToDataBase 09:01:36.5149796 JobId: 54
ValidateXml 09:01:36.6861048 JobId: 96
ValidateXml 09:01:36.7845716 JobId: 97
ValidateXml 09:01:37.0175979 JobId: 98
ValidateXml 09:01:37.3788835 JobId: 99
ValidateXml 09:01:37.6477046 JobId: 100
ProcessJob 09:01:37.8269808 JobId: 68
SendToDataBase 09:01:37.8269808 JobId: 55
ProcessJob 09:01:37.8940108 JobId: 69
ProcessJob 09:01:38.2955556 JobId: 70
ProcessJob 09:01:38.3110583 JobId: 71
SendToDataBase 09:01:38.3110583 JobId: 56
SendToDataBase 09:01:38.3125586 JobId: 57
CreateResponse 09:01:38.4551538 JobId: 95
CreateResponse 09:01:38.4925304 JobId: 96
ProcessJob 09:01:38.5382532 JobId: 72
ProcessJob 09:01:38.9129894 JobId: 73
SendToDataBase 09:01:38.9129894 JobId: 63
SendToDataBase 09:01:38.9185062 JobId: 64
SendToDataBase 09:01:38.9189949 JobId: 65
ProcessJob 09:01:38.9852121 JobId: 74
ProcessJob 09:01:39.0317458 JobId: 75
SendToDataBase 09:01:39.0317458 JobId: 66
SendToDataBase 09:01:39.0377511 JobId: 67
ProcessJob 09:01:39.6129381 JobId: 76
SendToDataBase 09:01:39.6129381 JobId: 68
ProcessJob 09:01:39.7833004 JobId: 77
SendToDataBase 09:01:39.7833004 JobId: 69
ProcessJob 09:01:39.8740443 JobId: 78
ProcessJob 09:01:40.3145731 JobId: 79
SendToDataBase 09:01:40.3145731 JobId: 70
SendToDataBase 09:01:40.3205708 JobId: 71
ProcessJob 09:01:40.4912084 JobId: 80
ProcessJob 09:01:40.5307205 JobId: 81
SendToDataBase 09:01:40.5317212 JobId: 72
ProcessJob 09:01:40.5652454 JobId: 82
ProcessJob 09:01:41.2902736 JobId: 83
ProcessJob 09:01:41.2902736 JobId: 84
ProcessJob 09:01:41.3598244 JobId: 85
SendToDataBase 09:01:41.3598244 JobId: 73
SendToDataBase 09:01:41.3663284 JobId: 74
SendToDataBase 09:01:41.3713317 JobId: 75
SendToDataBase 09:01:41.3718392 JobId: 76
SendToDataBase 09:01:41.3723328 JobId: 77
ProcessJob 09:01:42.2677493 JobId: 86
SendToDataBase 09:01:42.2677493 JobId: 78
ProcessJob 09:01:42.6466081 JobId: 87
ProcessJob 09:01:42.8947969 JobId: 88
SendToDataBase 09:01:42.8947969 JobId: 79
ProcessJob 09:01:43.0012509 JobId: 89
ProcessJob 09:01:43.1513589 JobId: 90
ProcessJob 09:01:43.4545800 JobId: 91
SendToDataBase 09:01:43.4545800 JobId: 80
SendToDataBase 09:01:43.4600832 JobId: 81
SendToDataBase 09:01:43.4605919 JobId: 82
ProcessJob 09:01:43.5946813 JobId: 92
ProcessJob 09:01:44.1731027 JobId: 93
SendToDataBase 09:01:44.1731027 JobId: 83
SendToDataBase 09:01:44.1786068 JobId: 84
SendToDataBase 09:01:44.1816090 JobId: 85
ProcessJob 09:01:44.4678171 JobId: 94
SendToDataBase 09:01:44.4678171 JobId: 86
ProcessJob 09:01:45.3426043 JobId: 97
SendToDataBase 09:01:45.3426043 JobId: 87
ProcessJob 09:01:45.3751270 JobId: 98
ProcessJob 09:01:45.7363757 JobId: 99
ProcessJob 09:01:45.7809216 JobId: 100
SendToDataBase 09:01:45.7809216 JobId: 88
SendToDataBase 09:01:45.7879270 JobId: 89
SendToDataBase 09:01:45.7925566 JobId: 90
SendToDataBase 09:01:45.8776726 JobId: 91
SendToDataBase 09:01:45.8776726 JobId: 92
SendToDataBase 09:01:46.5813640 JobId: 93
SendToDataBase 09:01:46.5813640 JobId: 94
SendToDataBase 09:01:47.7407165 JobId: 97
SendToDataBase 09:01:47.7407165 JobId: 98
SendToDataBase 09:01:48.4382058 JobId: 99
SendToDataBase 09:01:48.7357557 JobId: 100

如果你 link 一个 TransformBlock 到一个 ActionBlock 就可以做到这一点。

使用可编译的控制台应用程序最容易演示。

此应用程序处理整数序列,但您可以用自定义工作单元替换整数 class。

(我从我编写的一个实用程序修改了这段代码,该实用程序使用相对较慢的 LZMA 压缩算法进行多线程文件压缩。该实用程序必须从文件中顺序读取输入数据,然后以块的形式将其传递给队列使用多个线程以任意顺序处理数据,最后将压缩块输出到必须保留数据块原始顺序的队列。)

示例代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Program
    {
        public static void Main()
        {
            var data = Enumerable.Range(1, 100);
            var task = Process(data);

            Console.WriteLine("Waiting for task to complete");
            task.Wait();
            Console.WriteLine("Task complete.");
        }

        public static async Task Process(IEnumerable<int> data)
        {
            var queue = new TransformBlock<int, int>(block => process(block), transformBlockOptions());
            var writer = new ActionBlock<int>(block => write(block), actionBlockOptions());

            queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });

            await enqueDataToProcessAndAwaitCompletion(data, queue);

            await writer.Completion;
        }

        static int process(int block)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
            emulateWorkload();
            return -block;
        }

        static void write(int block)
        {
            Console.WriteLine("Output: " + block);
        }

        static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable<int> data, TransformBlock<int, int> queue)
        {
            await enqueueDataToProcess(data, queue);
            queue.Complete();
        }

        static async Task enqueueDataToProcess(IEnumerable<int> data, ITargetBlock<int> queue)
        {
            foreach (var item in data)
                await queue.SendAsync(item);
        }


        static ExecutionDataflowBlockOptions transformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static Random rng = new Random();
        static object locker = new object();

        static void emulateWorkload()
        {
            int delay;

            lock (locker)
            {
                delay = rng.Next(250, 750);
            }

            Thread.Sleep(delay);
        }
    }
}

输出:

Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .

注意"blocks"是如何被多个线程以任意顺序处理的,但是输出顺序与输入顺序相同。

按照actionBlockOptions()方法设置输出操作块选项非常重要,MaxDegreeOfParallelismBoundedCapacity都设置为1。

这就是导致输出以正确顺序序列化的原因。如果将 BoundedCapacityMaxDegreeOfParallelism 设置为大于 1 的输出,则可能会以错误的顺序输出。

@Matthew Watson 有一个很好的建议,我只想提出,没有必要将 MaxDegreeOfParallelism 和 BoundedCapacity 的最终动作块限制为 1,除非您使用 Microsoft.Tpl.Dataflow 包。较新且正确的 System.Threading.Tasks.Dataflow 将 属性 EnsureOrdered 添加到执行块选项中。虽然这似乎没有在 MSDN 中记录,但您可以找到这个 属性 及其在 TPL Dataflow Source.

中的使用

这是一个演示此行为的示例和测试,将执行选项中的 EnsureOrdered 更改为 false 将导致测试失败。默认值为 true,不需要为有序行为显式设置。

编辑: 正如@Matthew Watson 在下面指出的那样,虽然 EnsureOrdered 使传播块之间的事物保持有序,但一旦在操作块中,消息就可以按任何顺序处理。

Edit2: 注意:如果 ActionBlock 将 MaxDegreeOfParllelismBoundedCapacity 设置为 1,但 EnsureOrdered 为 false,则测试将失败,结果将乱序。

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock<Message> buffer;
    private TransformBlock<Message, Message> xForm1;
    private ActionBlock<Message> action;
    public IList<Message> OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List<Message>();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 2,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock<Message>();

        xForm1 = new TransformBlock<Message, Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        action = new ActionBlock<Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Output  Id: {x.Id} Value: {x.Value}");

            //this delay will cause the messages to be unordered
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            OutputMessages.Add(x);
        }, options);
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);
        xForm1.LinkTo(action, options);
    }

    public Task PostData(IEnumerable<Message> data) {

        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return action.Completion;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}

编辑: 不幸的是我们不能直接访问内部 ReorderingBuffer。因此,BoundedCapacityMaxDegreeOfParallelism 等于 1 的 ActionBlock 的替代方法是 link TransformBlock 有序输出到流。请注意,在上面的代码中,并行启用 ActionBlock 中的延迟会导致结果乱序,但在下面的代码中,流处理中的延迟不会打乱顺序。本质上,提供与同步 ActionBlock 相同的行为,并且可以提供另一部分网格等

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock<Message> buffer;
    private TransformBlock<Message, Message> xForm1;
    private IObservable<Message> output;
    private TaskCompletionSource<bool> areWeDoneYet;
    public IList<Message> OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List<Message>();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 13,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock<Message>();

        xForm1 = new TransformBlock<Message, Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        output = xForm1.AsObservable<Message>();

        areWeDoneYet = new TaskCompletionSource<bool>();
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);

        var subscription = output.Subscribe(msg => {
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            OutputMessages.Add(msg);
        }, () => areWeDoneYet.SetResult(true));            
    }

    public Task<bool> PostData(IEnumerable<Message> data) {            
        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return areWeDoneYet.Task;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}

编辑2: 此外,我的管道应该有 3 个阶段,我怎么能 link 那些?因此,当第一个块处理完第一个文件时,它开始将数据输出到下一个块,这将再次并行和异步工作。

这不是由它们的 link 编辑方式驱动的,而是在 ExecutionDataflowBlockOptions 中。使用下面显示的选项,第一个块将根据发布的文件数量及其给定的处理时间来处理任务,当它们完成时,它们将输出到下一阶段的处理或您的故障处理 ActionBlock在您的 Job.ReturnCode 谓词上,下一阶段将遵循相同的规则。您还可以修改 ActionBlock 选项以处理来自 TransformBlocks 的多个 success/failures。

var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 10,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };
var loadXml = new TransformBlock<Job, Job>(job => { ... }, options); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }, options); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }, options); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

编辑3 响应OP添加的源代码: 通过将 MaxDegreeOfParallelismBoundedCapcity 设置为 1,您在最后一个转换块中失去了有序行为。让我清楚一点 不要对 "ensure order"[=64= 】 只是在斗图书馆。这是 TransformBlock:

中的相关片段
            // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
            // However, a developer can override this with EnsureOrdered == false.
            if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
            {
                _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
            }

这是一个 运行,其中包含 20 个数据点,您的代码已修改为在最终 TBlock 中使用并行性。修改为基本 csv 以在 Excel 中查看,即替换 " " => "," :)

Function,TimeStamp/Inserted JobId,Other,Other,Other,Other,Other,Other,Other,JobId From functions
ReadDocument,04:54.0,|,Thread,6,is,processing,Job,Id:,1
ReadDocument,04:54.0,|,Thread,11,is,processing,Job,Id:,2
ReadDocument,04:56.0,|,Thread,13,is,processing,Job,Id:,3
ReadDocument,04:56.0,|,Thread,6,is,processing,Job,Id:,4
ReadDocument,04:57.0,|,Thread,11,is,processing,Job,Id:,5
ReadDocument,04:57.0,|,Thread,14,is,processing,Job,Id:,6
ReadDocument,04:58.0,|,Thread,15,is,processing,Job,Id:,7
ReadDocument,04:58.0,|,Thread,6,is,processing,Job,Id:,8
ReadDocument,04:59.0,|,Thread,11,is,processing,Job,Id:,9
ReadDocument,04:59.0,|,Thread,16,is,processing,Job,Id:,10
ReadDocument,05:00.0,|,Thread,17,is,processing,Job,Id:,12
ReadDocument,05:00.0,|,Thread,15,is,processing,Job,Id:,11
ReadDocument,05:01.0,|,Thread,16,is,processing,Job,Id:,13
ReadDocument,05:01.0,|,Thread,18,is,processing,Job,Id:,14
ReadDocument,05:02.0,|,Thread,15,is,processing,Job,Id:,15
ReadDocument,05:02.0,|,Thread,17,is,processing,Job,Id:,20
ValidateXml,05:02.0,|,Thread,19,is,processing,Job,Id:,1
ReadDocument,05:02.0,|,Thread,14,is,processing,Job,Id:,17
ReadDocument,05:02.0,|,Thread,13,is,processing,Job,Id:,16
ReadDocument,05:02.0,|,Thread,11,is,processing,Job,Id:,18
ReadDocument,05:02.0,|,Thread,6,is,processing,Job,Id:,19
ValidateXml,05:03.0,|,Thread,16,is,processing,Job,Id:,2
ValidateXml,05:03.0,|,Thread,20,is,processing,Job,Id:,3
ValidateXml,05:04.0,|,Thread,11,is,processing,Job,Id:,4
ValidateXml,05:04.0,|,Thread,21,is,processing,Job,Id:,7
ValidateXml,05:04.0,|,Thread,18,is,processing,Job,Id:,5
ValidateXml,05:04.0,|,Thread,15,is,processing,Job,Id:,6
ValidateXml,05:04.5,|,Thread,16,is,processing,Job,Id:,8
ValidateXml,05:04.5,|,Thread,6,is,processing,Job,Id:,9
ValidateXml,05:04.6,|,Thread,19,is,processing,Job,Id:,10
ProcessJob,05:04.6,|,Thread,14,is,processing,Job,Id:,2
ProcessJob,05:04.6,|,Thread,22,is,processing,Job,Id:,1
ValidateXml,05:05.5,|,Thread,18,is,processing,Job,Id:,11
ValidateXml,05:05.6,|,Thread,20,is,processing,Job,Id:,12
ProcessJob,05:05.6,|,Thread,23,is,processing,Job,Id:,3
ValidateXml,05:06.5,|,Thread,6,is,processing,Job,Id:,13
ValidateXml,05:06.5,|,Thread,21,is,processing,Job,Id:,15
ID,1,was,successfully,imported.,,,,,
ValidateXml,05:06.5,|,Thread,16,is,processing,Job,Id:,14
ValidateXml,05:06.8,|,Thread,15,is,processing,Job,Id:,17
ProcessJob,05:06.8,|,Thread,24,is,processing,Job,Id:,4
ValidateXml,05:06.8,|,Thread,11,is,processing,Job,Id:,16
ProcessJob,05:06.8,|,Thread,22,is,processing,Job,Id:,5
ProcessJob,05:07.5,|,Thread,17,is,processing,Job,Id:,6
ProcessJob,05:07.5,|,Thread,25,is,processing,Job,Id:,8
ValidateXml,05:07.5,|,Thread,19,is,processing,Job,Id:,18
ProcessJob,05:07.5,|,Thread,14,is,processing,Job,Id:,7
ValidateXml,05:08.5,|,Thread,16,is,processing,Job,Id:,19
ProcessJob,05:08.5,|,Thread,23,is,processing,Job,Id:,9
ValidateXml,05:08.5,|,Thread,18,is,processing,Job,Id:,20
ProcessJob,05:09.5,|,Thread,19,is,processing,Job,Id:,10
ID,2,was,successfully,imported.,,,,,
ProcessJob,05:09.5,|,Thread,15,is,processing,Job,Id:,11
ID,3,was,successfully,imported.,,,,,
ProcessJob,05:10.6,|,Thread,14,is,processing,Job,Id:,12
ProcessJob,05:10.9,|,Thread,25,is,processing,Job,Id:,13
ProcessJob,05:11.0,|,Thread,24,is,processing,Job,Id:,14
ID,4,was,successfully,imported.,,,,,
ProcessJob,05:11.1,|,Thread,17,is,processing,Job,Id:,15
ProcessJob,05:11.3,|,Thread,22,is,processing,Job,Id:,16
ID,5,was,successfully,imported.,,,,,
ID,6,was,successfully,imported.,,,,,
ID,7,was,successfully,imported.,,,,,
ID,8,was,successfully,imported.,,,,,
ProcessJob,05:11.6,|,Thread,19,is,processing,Job,Id:,17
ProcessJob,05:11.7,|,Thread,23,is,processing,Job,Id:,18
ID,9,was,successfully,imported.,,,,,
ID,10,was,successfully,imported.,,,,,
ProcessJob,05:12.0,|,Thread,14,is,processing,Job,Id:,19
ProcessJob,05:12.4,|,Thread,15,is,processing,Job,Id:,20
ID,11,was,successfully,imported.,,,,,
ID,12,was,successfully,imported.,,,,,
ID,13,was,successfully,imported.,,,,,
ID,14,was,successfully,imported.,,,,,
ID,15,was,successfully,imported.,,,,,
ID,16,was,successfully,imported.,,,,,
ID,17,was,successfully,imported.,,,,,
ID,18,was,successfully,imported.,,,,,
ID,19,was,successfully,imported.,,,,,
ID,20,was,successfully,imported.,,,,,

最后一点:函数 returning bool 成功并将异常映射到 return 代码可能 有问题 ,但是超出了这个问题的范围。通过在 Stack Exchange Code Review

上发布代码,您可以获得很多关于最佳实践的好建议

原回答正文变得太长

Edit4:对 OP Edit2 的回应 我不确定为生成提供的输出所做的确切更改,但这是您修改后的源代码,结果显示所有 100 个输入的有序行为。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing {
    public class Job {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job) {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job) {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job) {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO => {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o) {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job) {
            if (job.ReturnCode == 100) {
                Console.WriteLine($"CreateResponse {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            }
            else {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline() {
            var loadXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ReadDocument(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ValidateXml(job)) {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job => {
                try {
                    if (ProcessJob(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            //importJob.LinkTo(reportImport);

            var output = importJob.AsObservable();
            var subscription = output.Subscribe(x => {
            if (x.ReturnCode == 100) {
                //job success
                Console.WriteLine($"SendToDataBase {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }
            else {
                //handle fault
                Console.WriteLine($"Job Failed {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }                
        });

            // Return the head of the network.
            return loadXml;
        }

        public void Start() {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id) {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel() {
            if (cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args) {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach (var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

结果

Function,Timestamp,Other,JobId
ReadDocument,08:11:27.2200011,JobId:,1
ReadDocument,08:11:27.2240007,JobId:,2
ReadDocument,08:11:29.7562763,JobId:,3
ReadDocument,08:11:29.7662792,JobId:,4
ReadDocument,08:11:30.7013793,JobId:,5
ReadDocument,08:11:31.7024931,JobId:,6
ReadDocument,08:11:31.7034925,JobId:,7
ReadDocument,08:11:32.7306060,JobId:,9
ReadDocument,08:11:32.7306060,JobId:,8
ReadDocument,08:11:33.7027033,JobId:,10
ReadDocument,08:11:33.7027033,JobId:,11
ReadDocument,08:11:34.7018217,JobId:,12
ReadDocument,08:11:34.7028153,JobId:,13
ReadDocument,08:11:35.7019214,JobId:,14
ReadDocument,08:11:35.7069235,JobId:,15
ReadDocument,08:11:35.7069235,JobId:,16
ReadDocument,08:11:35.7069235,JobId:,17
ReadDocument,08:11:35.7079221,JobId:,18
ValidateXml,08:11:35.7119363,JobId:,1
ValidateXml,08:11:36.7060334,JobId:,2
ReadDocument,08:11:36.7060334,JobId:,19
ReadDocument,08:11:36.7070332,JobId:,20
ReadDocument,08:11:37.7071383,JobId:,21
ReadDocument,08:11:37.7071383,JobId:,22
ReadDocument,08:11:37.7081392,JobId:,23
ValidateXml,08:11:37.7091421,JobId:,3
ReadDocument,08:11:38.7032496,JobId:,24
ValidateXml,08:11:38.7052496,JobId:,6
ValidateXml,08:11:38.7042513,JobId:,4
ReadDocument,08:11:38.7052496,JobId:,27
ValidateXml,08:11:38.7042513,JobId:,5
ReadDocument,08:11:38.7052496,JobId:,28
ReadDocument,08:11:38.7042513,JobId:,26
ReadDocument,08:11:38.7032496,JobId:,25
ValidateXml,08:11:39.7023545,JobId:,7
ReadDocument,08:11:39.7023545,JobId:,29
ValidateXml,08:11:39.7023545,JobId:,8
ReadDocument,08:11:40.7064634,JobId:,30
ReadDocument,08:11:40.7064634,JobId:,31
ValidateXml,08:11:40.7084642,JobId:,9
ValidateXml,08:11:41.7045755,JobId:,10
ReadDocument,08:11:41.7085762,JobId:,33
ValidateXml,08:11:41.7105750,JobId:,11
ValidateXml,08:11:41.7115767,JobId:,12
ValidateXml,08:11:41.7135740,JobId:,13
ValidateXml,08:11:41.7155790,JobId:,14
ReadDocument,08:11:41.7085762,JobId:,34
ReadDocument,08:11:41.7045755,JobId:,32
ReadDocument,08:11:41.7105750,JobId:,35
ReadDocument,08:11:41.7135740,JobId:,36
ReadDocument,08:11:42.7086844,JobId:,37
ValidateXml,08:11:42.7116926,JobId:,15
ValidateXml,08:11:42.7126878,JobId:,16
ReadDocument,08:11:42.7116926,JobId:,38
ValidateXml,08:11:43.7027911,JobId:,17
ValidateXml,08:11:43.7027911,JobId:,18
ValidateXml,08:11:43.7068030,JobId:,20
ProcessJob,08:11:43.7097908,JobId:,1
ValidateXml,08:11:43.7057897,JobId:,19
ReadDocument,08:11:43.7057897,JobId:,39
ReadDocument,08:11:43.7077893,JobId:,40
ReadDocument,08:11:44.7038990,JobId:,41
ProcessJob,08:11:44.7059002,JobId:,2
ValidateXml,08:11:44.7049004,JobId:,21
ReadDocument,08:11:44.7038990,JobId:,42
ValidateXml,08:11:44.7059002,JobId:,22
ReadDocument,08:11:44.7089023,JobId:,44
ReadDocument,08:11:44.7049004,JobId:,43
ReadDocument,08:11:45.7030090,JobId:,45
ValidateXml,08:11:45.7030090,JobId:,23
ValidateXml,08:11:45.7120179,JobId:,24
ValidateXml,08:11:45.7120179,JobId:,25
ReadDocument,08:11:45.7140087,JobId:,46
ValidateXml,08:11:45.7170104,JobId:,26
ReadDocument,08:11:45.7190107,JobId:,47
ProcessJob,08:11:45.7200086,JobId:,3
ValidateXml,08:11:45.7170104,JobId:,27
ReadDocument,08:11:46.7071167,JobId:,48
ReadDocument,08:11:46.7101161,JobId:,50
ProcessJob,08:11:46.7111152,JobId:,4
ValidateXml,08:11:46.7111152,JobId:,28
ReadDocument,08:11:46.7071167,JobId:,49
ValidateXml,08:11:47.7032249,JobId:,29
ReadDocument,08:11:47.7062243,JobId:,51
ReadDocument,08:11:47.7072261,JobId:,52
ReadDocument,08:11:47.7092253,JobId:,53
ProcessJob,08:11:47.7102243,JobId:,5
ProcessJob,08:11:47.7112241,JobId:,7
ReadDocument,08:11:47.7102243,JobId:,55
ValidateXml,08:11:47.7062243,JobId:,30
ProcessJob,08:11:47.7102243,JobId:,6
ValidateXml,08:11:47.7072261,JobId:,31
ReadDocument,08:11:47.7092253,JobId:,54
ReadDocument,08:11:48.7063329,JobId:,56
ProcessJob,08:11:48.7073331,JobId:,8
ValidateXml,08:11:48.7063329,JobId:,32
ValidateXml,08:11:48.7063329,JobId:,33
ValidateXml,08:11:49.7074443,JobId:,34
ReadDocument,08:11:49.7104422,JobId:,59
ReadDocument,08:11:49.7124418,JobId:,60
ProcessJob,08:11:49.7124418,JobId:,9
ValidateXml,08:11:49.7144433,JobId:,36
ValidateXml,08:11:49.7114420,JobId:,35
ReadDocument,08:11:49.7074443,JobId:,57
ReadDocument,08:11:49.7084468,JobId:,58
ValidateXml,08:11:50.7065604,JobId:,37
ReadDocument,08:11:50.7095502,JobId:,61
ProcessJob,08:11:50.7105504,JobId:,10
ReadDocument,08:11:50.7115502,JobId:,63
ValidateXml,08:11:50.7125515,JobId:,40
ReadDocument,08:11:50.7105504,JobId:,62
ValidateXml,08:11:50.7095502,JobId:,39
ValidateXml,08:11:50.7075518,JobId:,38
ReadDocument,08:11:50.7115502,JobId:,64
ReadDocument,08:11:51.7076596,JobId:,65
ReadDocument,08:11:51.7086597,JobId:,66
ProcessJob,08:11:51.7116603,JobId:,13
ProcessJob,08:11:51.7106605,JobId:,12
ProcessJob,08:11:51.7086597,JobId:,11
ValidateXml,08:11:51.7076596,JobId:,41
SendToDataBase,08:11:51.7366672,JobId:,1
SendToDataBase,08:11:51.7416631,JobId:,2
SendToDataBase,08:11:51.7496646,JobId:,3
CreateResponse,08:11:51.7546639,JobId:,56
ValidateXml,08:11:52.7037712,JobId:,42
ValidateXml,08:11:52.7037712,JobId:,43
ValidateXml,08:11:52.7077662,JobId:,44
ReadDocument,08:11:52.7107675,JobId:,69
ProcessJob,08:11:52.7077662,JobId:,14
ProcessJob,08:11:52.7077662,JobId:,15
ProcessJob,08:11:52.7087683,JobId:,16
ProcessJob,08:11:52.7087683,JobId:,17
ValidateXml,08:11:52.7097669,JobId:,45
ReadDocument,08:11:52.7097669,JobId:,67
ValidateXml,08:11:52.7097669,JobId:,46
ReadDocument,08:11:52.7107675,JobId:,68
ValidateXml,08:11:53.7069300,JobId:,47
ReadDocument,08:11:53.7078801,JobId:,70
ValidateXml,08:11:53.7108792,JobId:,48
SendToDataBase,08:11:53.7118774,JobId:,4
SendToDataBase,08:11:53.7208818,JobId:,5
SendToDataBase,08:11:53.7228802,JobId:,6
SendToDataBase,08:11:53.7238781,JobId:,7
SendToDataBase,08:11:53.7258800,JobId:,8
ReadDocument,08:11:53.7118774,JobId:,73
ReadDocument,08:11:53.7098805,JobId:,71
ReadDocument,08:11:53.7118774,JobId:,72
ValidateXml,08:11:54.7059933,JobId:,49
ValidateXml,08:11:54.7069847,JobId:,50
ValidateXml,08:11:54.7089874,JobId:,51
CreateResponse,08:11:54.7109862,JobId:,41
CreateResponse,08:11:54.7169842,JobId:,42
SendToDataBase,08:11:54.7149888,JobId:,9
SendToDataBase,08:11:54.7259874,JobId:,10
SendToDataBase,08:11:54.7269883,JobId:,11
ProcessJob,08:11:54.7119868,JobId:,18
ReadDocument,08:11:54.7059933,JobId:,74
ValidateXml,08:11:54.7109862,JobId:,53
ProcessJob,08:11:54.7119868,JobId:,19
ProcessJob,08:11:54.7129854,JobId:,20
ValidateXml,08:11:54.7099852,JobId:,52
ReadDocument,08:11:54.7129854,JobId:,76
ReadDocument,08:11:54.7069847,JobId:,75
ReadDocument,08:11:55.7090940,JobId:,77
ReadDocument,08:11:55.7140926,JobId:,78
ValidateXml,08:11:55.7140926,JobId:,54
SendToDataBase,08:11:55.7180953,JobId:,12
CreateResponse,08:11:55.7180953,JobId:,43
ProcessJob,08:11:55.7180953,JobId:,21
SendToDataBase,08:11:55.7230962,JobId:,13
ValidateXml,08:11:55.7170947,JobId:,55
ReadDocument,08:11:55.7160937,JobId:,79
ReadDocument,08:11:55.7170947,JobId:,80
ValidateXml,08:11:55.8111031,JobId:,57
ReadDocument,08:11:55.8111031,JobId:,81
ProcessJob,08:11:55.8451120,JobId:,22
ProcessJob,08:11:56.1251577,JobId:,23
ReadDocument,08:11:56.2531569,JobId:,82
ReadDocument,08:11:56.3441756,JobId:,83
ProcessJob,08:11:56.3571695,JobId:,24
ValidateXml,08:11:56.3851785,JobId:,58
ReadDocument,08:11:56.4061804,JobId:,84
ValidateXml,08:11:56.6222012,JobId:,59
CreateResponse,08:11:56.6222012,JobId:,49
ProcessJob,08:11:56.9112320,JobId:,25
ValidateXml,08:11:56.9412405,JobId:,60
ProcessJob,08:11:57.0002533,JobId:,26
ValidateXml,08:11:57.2352587,JobId:,61
ProcessJob,08:11:57.4852908,JobId:,27
ReadDocument,08:11:58.2093656,JobId:,85
SendToDataBase,08:11:58.2163692,JobId:,14
ReadDocument,08:11:58.2113664,JobId:,87
SendToDataBase,08:11:58.2203645,JobId:,15
SendToDataBase,08:11:58.2293743,JobId:,16
SendToDataBase,08:11:58.2303706,JobId:,17
SendToDataBase,08:11:58.2313662,JobId:,18
SendToDataBase,08:11:58.2333692,JobId:,19
SendToDataBase,08:11:58.2353681,JobId:,20
SendToDataBase,08:11:58.2373688,JobId:,21
SendToDataBase,08:11:58.2383671,JobId:,22
SendToDataBase,08:11:58.2393673,JobId:,23
ValidateXml,08:11:58.2123658,JobId:,63
CreateResponse,08:11:58.2163692,JobId:,50
CreateResponse,08:11:58.2543716,JobId:,51
CreateResponse,08:11:58.2643699,JobId:,52
CreateResponse,08:11:58.2663730,JobId:,53
ProcessJob,08:11:58.2143646,JobId:,31
ProcessJob,08:11:58.2123658,JobId:,29
ReadDocument,08:11:58.2093656,JobId:,86
ReadDocument,08:11:58.2123658,JobId:,88
ProcessJob,08:11:58.2133656,JobId:,30
ProcessJob,08:11:58.2103650,JobId:,28
ValidateXml,08:11:58.2113664,JobId:,62
ReadDocument,08:11:58.2123658,JobId:,89
ValidateXml,08:11:58.2133656,JobId:,64
ValidateXml,08:11:59.7055294,JobId:,65
ReadDocument,08:11:59.7065300,JobId:,91
ValidateXml,08:11:59.7065300,JobId:,66
SendToDataBase,08:11:59.7115275,JobId:,24
SendToDataBase,08:11:59.7195324,JobId:,25
SendToDataBase,08:11:59.7205330,JobId:,26
ProcessJob,08:11:59.7085277,JobId:,33
ValidateXml,08:11:59.7085277,JobId:,68
ReadDocument,08:11:59.7095263,JobId:,93
ValidateXml,08:11:59.7085277,JobId:,67
ReadDocument,08:11:59.7095263,JobId:,92
ProcessJob,08:11:59.7095263,JobId:,34
ProcessJob,08:11:59.7075275,JobId:,32
ReadDocument,08:11:59.7055294,JobId:,90
ValidateXml,08:11:59.7105265,JobId:,70
ValidateXml,08:11:59.7095263,JobId:,69
ReadDocument,08:11:59.7105265,JobId:,94
ValidateXml,08:12:00.7146358,JobId:,71
SendToDataBase,08:12:00.7176364,JobId:,27
ReadDocument,08:12:00.7156372,JobId:,97
ProcessJob,08:12:00.7146358,JobId:,35
ProcessJob,08:12:00.7156372,JobId:,36
ReadDocument,08:12:00.7146358,JobId:,95
ReadDocument,08:12:00.7156372,JobId:,96
ReadDocument,08:12:00.8616797,JobId:,98
ValidateXml,08:12:00.8796565,JobId:,72
ReadDocument,08:12:00.9066595,JobId:,99
ReadDocument,08:12:00.9786697,JobId:,100
ValidateXml,08:12:00.9866692,JobId:,73
ProcessJob,08:12:01.0766830,JobId:,37
ValidateXml,08:12:01.1176829,JobId:,74
ProcessJob,08:12:01.1176829,JobId:,38
ProcessJob,08:12:01.2167037,JobId:,39
SendToDataBase,08:12:01.2167037,JobId:,28
SendToDataBase,08:12:01.2216970,JobId:,29
SendToDataBase,08:12:01.2236923,JobId:,30
SendToDataBase,08:12:01.2246914,JobId:,31
ValidateXml,08:12:01.2327001,JobId:,75
ValidateXml,08:12:01.5447286,JobId:,76
ProcessJob,08:12:01.6567738,JobId:,40
ValidateXml,08:12:01.9347686,JobId:,77
ProcessJob,08:12:02.2498041,JobId:,44
ProcessJob,08:12:02.4448257,JobId:,45
SendToDataBase,08:12:02.4458286,JobId:,32
ValidateXml,08:12:02.5469861,JobId:,78
ProcessJob,08:12:02.6268456,JobId:,46
SendToDataBase,08:12:02.6278997,JobId:,33
SendToDataBase,08:12:02.6378977,JobId:,34
SendToDataBase,08:12:02.6398461,JobId:,35
ValidateXml,08:12:02.6538506,JobId:,79
ProcessJob,08:12:03.1399063,JobId:,47
SendToDataBase,08:12:03.1489053,JobId:,36
ValidateXml,08:12:03.2979184,JobId:,80
ProcessJob,08:12:03.4959402,JobId:,48
ValidateXml,08:12:03.6259629,JobId:,81
ValidateXml,08:12:03.6769676,JobId:,82
ProcessJob,08:12:03.7719693,JobId:,54
ProcessJob,08:12:03.8519797,JobId:,55
ProcessJob,08:12:03.9689901,JobId:,57
SendToDataBase,08:12:04.0079945,JobId:,37
SendToDataBase,08:12:04.0099953,JobId:,38
SendToDataBase,08:12:04.0109931,JobId:,39
SendToDataBase,08:12:04.0119941,JobId:,40
ValidateXml,08:12:04.0299989,JobId:,84
ValidateXml,08:12:04.0089966,JobId:,83
ProcessJob,08:12:04.3350372,JobId:,58
ValidateXml,08:12:04.6541474,JobId:,85
ProcessJob,08:12:04.8791864,JobId:,59
SendToDataBase,08:12:04.8791864,JobId:,44
SendToDataBase,08:12:05.0252098,JobId:,45
SendToDataBase,08:12:05.0757198,JobId:,46
ProcessJob,08:12:05.0757198,JobId:,60
ValidateXml,08:12:05.1527328,JobId:,86
ProcessJob,08:12:05.1532325,JobId:,61
ValidateXml,08:12:05.2762716,JobId:,87
ValidateXml,08:12:05.3793706,JobId:,88
ValidateXml,08:12:05.5953056,JobId:,89
ValidateXml,08:12:05.6453136,JobId:,90
ProcessJob,08:12:05.8313378,JobId:,62
SendToDataBase,08:12:05.8313378,JobId:,47
ValidateXml,08:12:06.1573930,JobId:,91
ValidateXml,08:12:06.2043839,JobId:,92
ProcessJob,08:12:06.4384015,JobId:,63
SendToDataBase,08:12:06.4384015,JobId:,48
ProcessJob,08:12:06.6554190,JobId:,64
ProcessJob,08:12:06.7494355,JobId:,65
SendToDataBase,08:12:06.7494355,JobId:,54
SendToDataBase,08:12:06.7594308,JobId:,55
SendToDataBase,08:12:06.7624294,JobId:,57
ProcessJob,08:12:06.9254482,JobId:,66
SendToDataBase,08:12:06.9254482,JobId:,58
ValidateXml,08:12:07.0154624,JobId:,93
ValidateXml,08:12:07.0975086,JobId:,94
ProcessJob,08:12:07.1925138,JobId:,67
ValidateXml,08:12:07.2724877,JobId:,95
ProcessJob,08:12:07.6385268,JobId:,68
ProcessJob,08:12:07.7705429,JobId:,69
ValidateXml,08:12:07.8315476,JobId:,96
ProcessJob,08:12:07.8905526,JobId:,70
SendToDataBase,08:12:07.8905526,JobId:,59
SendToDataBase,08:12:07.8965534,JobId:,60
SendToDataBase,08:12:07.8975535,JobId:,61
ValidateXml,08:12:08.1306009,JobId:,97
ValidateXml,08:12:08.2065895,JobId:,98
ValidateXml,08:12:08.3106332,JobId:,99
ProcessJob,08:12:08.3296082,JobId:,71
ValidateXml,08:12:08.4406159,JobId:,100
ProcessJob,08:12:08.8396557,JobId:,72
SendToDataBase,08:12:08.8446570,JobId:,62
SendToDataBase,08:12:08.8806613,JobId:,63
SendToDataBase,08:12:08.8946619,JobId:,64
ProcessJob,08:12:09.0076746,JobId:,73
SendToDataBase,08:12:09.0086763,JobId:,65
ProcessJob,08:12:09.0996850,JobId:,74
ProcessJob,08:12:09.1106847,JobId:,75
SendToDataBase,08:12:09.1106847,JobId:,66
SendToDataBase,08:12:09.1136860,JobId:,67
ProcessJob,08:12:09.6547630,JobId:,76
SendToDataBase,08:12:09.6557462,JobId:,68
ProcessJob,08:12:09.9218032,JobId:,77
ProcessJob,08:12:10.2218075,JobId:,78
ProcessJob,08:12:10.4288308,JobId:,79
SendToDataBase,08:12:10.4288308,JobId:,69
SendToDataBase,08:12:10.4408307,JobId:,70
SendToDataBase,08:12:10.4448318,JobId:,71
ProcessJob,08:12:10.6858596,JobId:,80
SendToDataBase,08:12:10.6858596,JobId:,72
ProcessJob,08:12:11.4049481,JobId:,81
ProcessJob,08:12:11.7039814,JobId:,82
ProcessJob,08:12:11.8272054,JobId:,83
ProcessJob,08:12:11.9930072,JobId:,84
SendToDataBase,08:12:11.9930072,JobId:,73
SendToDataBase,08:12:11.9979988,JobId:,74
SendToDataBase,08:12:11.9989983,JobId:,75
SendToDataBase,08:12:11.9989983,JobId:,76
ProcessJob,08:12:12.3460366,JobId:,85
ProcessJob,08:12:12.4520491,JobId:,86
SendToDataBase,08:12:12.4520491,JobId:,77
ProcessJob,08:12:12.8810952,JobId:,87
ProcessJob,08:12:13.1443167,JobId:,88
SendToDataBase,08:12:13.1443167,JobId:,78
SendToDataBase,08:12:13.1471282,JobId:,79
ProcessJob,08:12:13.2041414,JobId:,89
SendToDataBase,08:12:13.2081302,JobId:,80
SendToDataBase,08:12:13.2101309,JobId:,81
ProcessJob,08:12:13.4381566,JobId:,90
SendToDataBase,08:12:13.4392215,JobId:,82
ProcessJob,08:12:13.6411889,JobId:,91
SendToDataBase,08:12:13.6411889,JobId:,83
ProcessJob,08:12:13.9472212,JobId:,92
SendToDataBase,08:12:13.9472212,JobId:,84
ProcessJob,08:12:14.3122494,JobId:,93
ProcessJob,08:12:14.7053031,JobId:,94
SendToDataBase,08:12:14.7053031,JobId:,85
SendToDataBase,08:12:14.7092946,JobId:,86
ProcessJob,08:12:14.9393634,JobId:,95
ProcessJob,08:12:15.4103709,JobId:,96
SendToDataBase,08:12:15.4113707,JobId:,87
ProcessJob,08:12:15.9355263,JobId:,97
ProcessJob,08:12:15.9724349,JobId:,98
SendToDataBase,08:12:15.9724349,JobId:,88
SendToDataBase,08:12:15.9774350,JobId:,89
ProcessJob,08:12:15.9724349,JobId:,99
SendToDataBase,08:12:15.9784371,JobId:,90
SendToDataBase,08:12:15.9834330,JobId:,91
ProcessJob,08:12:16.6175125,JobId:,100
SendToDataBase,08:12:16.6175125,JobId:,92
SendToDataBase,08:12:16.6555160,JobId:,93
SendToDataBase,08:12:17.5005984,JobId:,94
SendToDataBase,08:12:17.8846409,JobId:,95
SendToDataBase,08:12:17.8886408,JobId:,96
SendToDataBase,08:12:18.1186677,JobId:,97
SendToDataBase,08:12:18.7557365,JobId:,98
SendToDataBase,08:12:18.7567394,JobId:,99
SendToDataBase,08:12:19.5488221,JobId:,100

编辑 新订阅会将您的项目发送到 Db 或以您选择的方式处理错误的工作。

更多资源:

Stack Exchange Code Review

Dataflow Source