带前提条件的数据流 TPL 实现管道

Dataflow TPL Implementing pipeline with precondition

我对使用 Dataflow TPL 库实现流水线有疑问。

我的情况是我有一个软件需要同时处理一些任务。 处理看起来像这样:首先我们在全局级别处理相册,然后我们进入相册并单独处理每张图片。假设应用程序有处理槽并且它们是可配置的(为举例起见,假设槽 = 2)。这意味着应用程序可以处理:

a) 同时发布两张专辑
b) 一张相册 + 一张来自不同相册的照片
c) 同一相册同时拍摄两张照片
d) 不同相册同时拍摄两张照片

目前我实现的过程是这样的:

var albumTransferBlock = new TransformBlock<Album, Album>(ProcessAlbum,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

ActionBlock<Album> photoActionBlock = new ActionBlock<Album>(ProcessPhoto);

albumTransferBlock.LinkTo(photoActionBlock);


Album ProcessAlbum(Album a)
{
    return a;
}

void ProcessPhoto(Album album)
{
    foreach (var photo in album)
    {
        // do some processing
    }
}

我遇到的问题是,当我同时处理 1 张相册时,应用程序永远不会使用两个插槽来处理照片。它满足除 c)

之外的所有要求

谁能帮助我使用 DataFlow TPL 解决这个问题?

我想我可以自己回答。我所做的是:

1) 我使用方法 Process() 创建了一个接口 IProcessor 2) 用接口 IProcessor 包装 AlbumProcessing 和 PhotoProcessing 3) 创建一个以 IProcessor 作为输入并执行 Process 方法的 ActionBlock。

4) 在处理相册结束时,我将所有照片的处理添加到 ActionBlock。

这 100% 满足了我的要求。也许有人有其他解决方案?

您可以使用 TransformManyBlock for processing the albums, linked to an ActionBlock for processing the photos, so that each album is processed before its photos are processed. For imposing a concurrency limitation that exceeds the boundaries of a single block, you could use either a limited-concurrency TaskScheduler or a SemaphoreSlim. The second option is more flexible since it allows to throttle asynchronous operations as well. In your case all the operations are synchronous, so you are free to choose either approach. In both cases you should still configure the MaxDegreeOfParallelism option of the blocks to the desirable maximum concurrency limit, otherwise —if you make them unbounded— 处理顺序会变得过于随机。

这是 TaskScheduler 方法的示例。它使用 ConcurrentScheduler 属性 的 ConcurrentExclusiveSchedulerPair class:

var options = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    TaskScheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,
        maxConcurrencyLevel: 2).ConcurrentScheduler
};

var albumsBlock = new TransformManyBlock<Album, Photo>(album =>
{
    ProcessAlbum(album);
    return album.Photos;
}, options);

var photosBlock = new ActionBlock<Photo>(photo =>
{
    ProcessPhoto(photo);
}, options);

albumsBlock.LinkTo(photosBlock);

这里是 SemaphoreSlim 方法的一个例子。使用 WaitAsync method instead of the Wait has the advantage that the awaiting for acquiring the semaphore will happen asynchronously,因此不会不必要地阻塞 ThreadPool 个线程:

var throttler = new SemaphoreSlim(2);

var albumsBlock = new TransformManyBlock<Album, Photo>(async album =>
{
    await throttler.WaitAsync();
    try
    {
        ProcessAlbum(album);
        return album.Photos;
    }
    finally { throttler.Release(); }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

var photosBlock = new ActionBlock<Photo>(async photo =>
{
    await throttler.WaitAsync();
    try
    {
        ProcessPhoto(photo);
    }
    finally { throttler.Release(); }
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

albumsBlock.LinkTo(photosBlock);