TPL:处理已处理的项目
TPL: Dispose processed items
在 C# 中,我使用 Task Parallel Library (TPL) 下载图像、处理图像并保存分析结果。简化代码如下。
var getImage = new TransformBlock<int, Image>(GetImage);
var proImage = new TransformBlock<Image, double>(ProcessImage);
var saveRes = new ActionBlock<double>(SaveResult);
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);
for (int x = 0; x < 1000000; x++)
getImage.Post(x);
getImage.Complete();
SaveRes.Completion.Wait();
除内存使用情况外,这按预期工作。我期望 int_x
、image_x
和 double_x
在管道处理完该迭代后被释放。换句话说,我期望在执行 getImage
、proImage
和 saveRes
迭代 x
期间创建的每个资源在最后一个块完成执行时被释放。但是,此实现将所有对象保留在内存中,直到我退出 TPL 范围。
我错过了什么吗?这是 TPL 的预期行为吗?是否有任何选项可以设置以便在每次迭代结束时释放资源?
更新
根据评论中的建议,我使用 BufferBlock
和 SendAsync
重写了代码,如下所示。但是,我认为这不会导致要求每个任务消耗的资源。设置 BoundedCapacity
只会导致我的程序在我认为它已达到为 BoundedCapacity
.
设置的限制时停止。
var blockOpts = new DataflowBlockOptions()
{ BoundedCapacity = 100 };
var imgBuffer = new BufferBlock<int>(blockOpts);
var getImage = new TransformBlock<int, Image>(GetImage, blockOpts);
var proImage = new TransformBlock<Image, double>(ProcessImage, blockOpts);
var SaveRes = new ActionBlock<double>(SaveResult, blockOpts);
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
imgBuffer.LinkTo(getImage, linkOptions);
getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);
for (int x = 0; x < 1000000; x++)
await imgBuffer.SendAsync(x);
getImage.Complete();
SaveRes.Completion.Wait();
is this the expected behavior of TPL?
是的。它不会 root 所有对象(它们可用于垃圾收集和终结),但它也不会处理它们。
and is there any option to set so the resources are released at the end of each iteration?
没有
how can I can make sure dispose is auto called when the last block/action executed on an input?
要处理对象,您的代码 应该调用 Dispose
。这很容易通过修改 ProcessImage
或将其包装在委托中来完成。
如果ProcessImage
是同步的:
var proImage = new TransformBlock<Image, double>(image => { using (image) return ProcessImage(image); });
或者如果它是异步的:
var proImage = new TransformBlock<Image, double>(async image => { using (image) return await ProcessImage(image); });
在 C# 中,我使用 Task Parallel Library (TPL) 下载图像、处理图像并保存分析结果。简化代码如下。
var getImage = new TransformBlock<int, Image>(GetImage);
var proImage = new TransformBlock<Image, double>(ProcessImage);
var saveRes = new ActionBlock<double>(SaveResult);
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);
for (int x = 0; x < 1000000; x++)
getImage.Post(x);
getImage.Complete();
SaveRes.Completion.Wait();
除内存使用情况外,这按预期工作。我期望 int_x
、image_x
和 double_x
在管道处理完该迭代后被释放。换句话说,我期望在执行 getImage
、proImage
和 saveRes
迭代 x
期间创建的每个资源在最后一个块完成执行时被释放。但是,此实现将所有对象保留在内存中,直到我退出 TPL 范围。
我错过了什么吗?这是 TPL 的预期行为吗?是否有任何选项可以设置以便在每次迭代结束时释放资源?
更新
根据评论中的建议,我使用 BufferBlock
和 SendAsync
重写了代码,如下所示。但是,我认为这不会导致要求每个任务消耗的资源。设置 BoundedCapacity
只会导致我的程序在我认为它已达到为 BoundedCapacity
.
var blockOpts = new DataflowBlockOptions()
{ BoundedCapacity = 100 };
var imgBuffer = new BufferBlock<int>(blockOpts);
var getImage = new TransformBlock<int, Image>(GetImage, blockOpts);
var proImage = new TransformBlock<Image, double>(ProcessImage, blockOpts);
var SaveRes = new ActionBlock<double>(SaveResult, blockOpts);
var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
imgBuffer.LinkTo(getImage, linkOptions);
getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);
for (int x = 0; x < 1000000; x++)
await imgBuffer.SendAsync(x);
getImage.Complete();
SaveRes.Completion.Wait();
is this the expected behavior of TPL?
是的。它不会 root 所有对象(它们可用于垃圾收集和终结),但它也不会处理它们。
and is there any option to set so the resources are released at the end of each iteration?
没有
how can I can make sure dispose is auto called when the last block/action executed on an input?
要处理对象,您的代码 应该调用 Dispose
。这很容易通过修改 ProcessImage
或将其包装在委托中来完成。
如果ProcessImage
是同步的:
var proImage = new TransformBlock<Image, double>(image => { using (image) return ProcessImage(image); });
或者如果它是异步的:
var proImage = new TransformBlock<Image, double>(async image => { using (image) return await ProcessImage(image); });