使用 TPL 数据流,如何在中间使用 BatchBlock 组合工作流并返回到单个项目块?

Using TPL Dataflow, how to compose workflow with BatchBlock in the middle and back out to individual item block?

我们的想法是单独处理一个项目列表,然后进行批处理,然后无缝地返回到单独处理。 在批处理块中,我可能正在查询或保存到数据库中。使用批次访问数据库比针对列表中的每个项目多次访问数据库要高效得多。

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
            var block1 = new TransformBlock<WorkItem, WorkItem>(async item =>
            {
                // perform work on individual item
                await Task.Delay(1000);
                Console.WriteLine($"Block 1 - Item {item.Id}");
                return item;
            }, execOptions);
            var block2 = new TransformBlock<WorkItem, WorkItem>(async item =>
            {
                // perform more work on individual item
                await Task.Delay(1000);
                Console.WriteLine($"Block 2 - Item {item.Id}");
                return item;
            }, execOptions);
            var batch = new BatchBlock<WorkItem>(5);
            var batchWork = new ActionBlock<WorkItem[]>(async items =>
            {
                Console.WriteLine($"batchWork - {items.Length} Items");
                // perform batch work - query database, etc.
                await Task.Delay(2000);
                await Task.WhenAll(items.Select(x => block2.SendAsync(x)));
            }, execOptions);
            var batch2 = new BatchBlock<WorkItem>(10);
            var save = new ActionBlock<WorkItem[]>(async items =>
            {
                Console.WriteLine($"save - {items.Length} Items");
                // save items to the DB
                await Task.Delay(2000);
            }, execOptions);

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            block1.LinkTo(batch, linkOptions);
            batch.LinkTo(batchWork, linkOptions);
            block2.LinkTo(batch2, linkOptions);
            batch2.LinkTo(save, linkOptions);

            Console.WriteLine("Starting work");
            var workItems = Enumerable.Range(1, 10).Select(x => new WorkItem { Id = x }).ToArray();
            await Task.WhenAll(workItems.Select(x => block1.SendAsync(x)));
            block1.Complete();
            await batchWork.Completion;

            block2.Complete();
            await save.Completion;

            Console.WriteLine("All Done");
            Console.WriteLine("Hit Enter");
            Console.ReadLine();
        }
    }
    class WorkItem
    {
        public int Id { get; set; }

    }
}

我正在寻找一些反馈。基本上上面的代码示例似乎有效。 关键代码段在“batchWork”中,我通过在每个项目上调用 SendAsync 排队到“block2”。我不知道有什么方法可以 link up 任何其他方式。也许有更好的方法来完成我在这里想要完成的事情。 有什么建议吗?

您不需要使用 SendAsync。您可以将 batchWork 更改为 TransformManyBlock 并将其连接到下一个块:

var batchWork = new TransformManyBlock<WorkItem[],WorkItem>(async items =>
{
    Console.WriteLine($"batchWork - {items.Length} Items");
    // perform batch work - query database, etc.
    
    return items;
}, execOptions);

....

batch.LinkTo(batchWork, linkOptions);
batchWork.LinkTo(block2, linkOptions);
block2.LinkTo(batch2, linkOptions);