转换块链接到多个目标,谓词未按预期工作
Transform Block linking to multiple targets with predicate not working as expected
我有一个基于谓词链接到动作块的转换块。
// Blocks
public TransformBlock<Document, Document> DocumentCreationTransformBlock =
new TransformBlock<Document, Document>(async document =>
{
return await CreateAsync(document); // REST API call that sets document.NewId
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100,
MaxDegreeOfParallelism = 20
});
public ActionBlock<Document> SplitPipelineActionBlock =
new ActionBlock<Document>(async document =>
{ // implementation obfuscated
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100
};
// Shared block elements
public DataflowLinkOptions CommonLinkOptions = new DataflowLinkOptions {
PropagateCompletion = true };
// Link mesh
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
document => !string.IsNullOrEmpty(document.NewId));
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions);
转换块尝试通过 REST API 创建文档。它应该用 NewId
更新 Document
对象。因此 LinkTo 谓词检查返回的 Document 是否具有 NewId。
对于任何不符合此条件的对象,有一个 NullTarget
块来清空 TransformBlock
。
在我的测试中,我向管道发布了 10,100 个项目并确认所有项目都成功返回 NewId
。但是,正在将 130 个项目传递给 NullTarget
。当我再次在整个集合上重新运行该程序时,超过 3000 个项目被传递到 NullTarget
。甚至以前成功存储的项目 NewId
。
我怀疑 SplitPipelineActionBlock
BoundedCapacity
已满,而 LinkTo
只是忽略谓词,然后将要处理的项目传递给下一个 LinkTo,这是 NullTarget
.
我怎样才能让所有物品都有机会被发送到 SplitPipeLineAction
区块?
当源块中有一个项目可用时,它会一次一个地提供给它的link;如果任何 link 没有收到该物品,则将该物品提供给下一个 link。没有区分 为什么 没有使用 link。块确实区分 "maybe later" 回复和 "no" 回复(Postponed
vs Declined
),但在任何一种情况下,下一个 link 将尝试一个 linked 块,可以现在就拿.
解决此问题的最佳选择是向空块 link 添加一个谓词,它是目标块 link.
的谓词的否定
Predicate<Document> predicate = document => !string.IsNullOrEmpty(document.NewId);
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
predicate);
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions,
document => !predicate(document));
这样,当 SplitPipelineActionBlock
已满时,该项目将提供给 null link,由于谓词失败而被拒绝,该项目将保留在转换块输出缓冲区中,直到SplitPipelineActionBlock
有空间。
我有一个基于谓词链接到动作块的转换块。
// Blocks
public TransformBlock<Document, Document> DocumentCreationTransformBlock =
new TransformBlock<Document, Document>(async document =>
{
return await CreateAsync(document); // REST API call that sets document.NewId
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100,
MaxDegreeOfParallelism = 20
});
public ActionBlock<Document> SplitPipelineActionBlock =
new ActionBlock<Document>(async document =>
{ // implementation obfuscated
},
new ExecutionDataflowBlockOptions {
BoundedCapacity = 100
};
// Shared block elements
public DataflowLinkOptions CommonLinkOptions = new DataflowLinkOptions {
PropagateCompletion = true };
// Link mesh
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
document => !string.IsNullOrEmpty(document.NewId));
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions);
转换块尝试通过 REST API 创建文档。它应该用 NewId
更新 Document
对象。因此 LinkTo 谓词检查返回的 Document 是否具有 NewId。
对于任何不符合此条件的对象,有一个 NullTarget
块来清空 TransformBlock
。
在我的测试中,我向管道发布了 10,100 个项目并确认所有项目都成功返回 NewId
。但是,正在将 130 个项目传递给 NullTarget
。当我再次在整个集合上重新运行该程序时,超过 3000 个项目被传递到 NullTarget
。甚至以前成功存储的项目 NewId
。
我怀疑 SplitPipelineActionBlock
BoundedCapacity
已满,而 LinkTo
只是忽略谓词,然后将要处理的项目传递给下一个 LinkTo,这是 NullTarget
.
我怎样才能让所有物品都有机会被发送到 SplitPipeLineAction
区块?
当源块中有一个项目可用时,它会一次一个地提供给它的link;如果任何 link 没有收到该物品,则将该物品提供给下一个 link。没有区分 为什么 没有使用 link。块确实区分 "maybe later" 回复和 "no" 回复(Postponed
vs Declined
),但在任何一种情况下,下一个 link 将尝试一个 linked 块,可以现在就拿.
解决此问题的最佳选择是向空块 link 添加一个谓词,它是目标块 link.
的谓词的否定Predicate<Document> predicate = document => !string.IsNullOrEmpty(document.NewId);
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock,
CommonLinkOptions,
predicate);
DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(),
CommonLinkOptions,
document => !predicate(document));
这样,当 SplitPipelineActionBlock
已满时,该项目将提供给 null link,由于谓词失败而被拒绝,该项目将保留在转换块输出缓冲区中,直到SplitPipelineActionBlock
有空间。