TPL:处理已处理的项目

TPL: Dispose processed items

在 C# 中,我使用 Task Parallel Library (TPL) 下载图像、处理图像并保存分析结果。简化代码如下。

var getImage = new TransformBlock<int, Image>(GetImage);
var proImage = new TransformBlock<Image, double>(ProcessImage);
var saveRes = new ActionBlock<double>(SaveResult);

var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);

for (int x = 0; x < 1000000; x++)
    getImage.Post(x);
getImage.Complete();

SaveRes.Completion.Wait();

除内存使用情况外,这按预期工作。我期望 int_ximage_xdouble_x 在管道处理完该迭代后被释放。换句话说,我期望在执行 getImageproImagesaveRes 迭代 x 期间创建的每个资源在最后一个块完成执行时被释放。但是,此实现将所有对象保留在内存中,直到我退出 TPL 范围。

我错过了什么吗?这是 TPL 的预期行为吗?是否有任何选项可以设置以便在每次迭代结束时释放资源?

更新

根据评论中的建议,我使用 BufferBlockSendAsync 重写了代码,如下所示。但是,我认为这不会导致要求每个任务消耗的资源。设置 BoundedCapacity 只会导致我的程序在我认为它已达到为 BoundedCapacity.

设置的限制时停止。
var blockOpts = new DataflowBlockOptions()
{ BoundedCapacity = 100 };

var imgBuffer = new BufferBlock<int>(blockOpts);

var getImage = new TransformBlock<int, Image>(GetImage, blockOpts);
var proImage = new TransformBlock<Image, double>(ProcessImage, blockOpts);
var SaveRes = new ActionBlock<double>(SaveResult, blockOpts);

var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

imgBuffer.LinkTo(getImage, linkOptions);
getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);

for (int x = 0; x < 1000000; x++)
    await imgBuffer.SendAsync(x);
getImage.Complete();

SaveRes.Completion.Wait();

is this the expected behavior of TPL?

是的。它不会 root 所有对象(它们可用于垃圾收集和终结),但它也不会处理它们。

and is there any option to set so the resources are released at the end of each iteration?

没有

how can I can make sure dispose is auto called when the last block/action executed on an input?

要处理对象,您的代码 应该调用 Dispose。这很容易通过修改 ProcessImage 或将其包装在委托中来完成。

如果ProcessImage是同步的:

var proImage = new TransformBlock<Image, double>(image => { using (image) return ProcessImage(image); });

或者如果它是异步的:

var proImage = new TransformBlock<Image, double>(async image => { using (image) return await ProcessImage(image); });