转换块链接到多个目标,谓词未按预期工作

Transform Block linking to multiple targets with predicate not working as expected

我有一个基于谓词链接到动作块的转换块。

// Blocks
public TransformBlock<Document, Document> DocumentCreationTransformBlock =
    new TransformBlock<Document, Document>(async document => 
    {          
        return await CreateAsync(document); // REST API call that sets document.NewId
    }, 
    new ExecutionDataflowBlockOptions {
        BoundedCapacity = 100,
        MaxDegreeOfParallelism = 20
    });

public ActionBlock<Document> SplitPipelineActionBlock = 
    new ActionBlock<Document>(async document => 
    { // implementation obfuscated 
    },
    new ExecutionDataflowBlockOptions {
        BoundedCapacity = 100           
    }; 

// Shared block elements      
public DataflowLinkOptions CommonLinkOptions = new DataflowLinkOptions {
    PropagateCompletion = true };

// Link mesh
DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock, 
    CommonLinkOptions, 
    document => !string.IsNullOrEmpty(document.NewId));

DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(), 
    CommonLinkOptions);

转换块尝试通过 REST API 创建文档。它应该用 NewId 更新 Document 对象。因此 LinkTo 谓词检查返回的 Document 是否具有 NewId。

对于任何不符合此条件的对象,有一个 NullTarget 块来清空 TransformBlock

在我的测试中,我向管道发布了 10,100 个项目并确认所有项目都成功返回 NewId。但是,正在将 130 个项目传递给 NullTarget。当我再次在整个集合上重新运行该程序时,超过 3000 个项目被传递到 NullTarget。甚至以前成功存储的项目 NewId

我怀疑 SplitPipelineActionBlock BoundedCapacity 已满,而 LinkTo 只是忽略谓词,然后将要处理的项目传递给下一个 LinkTo,这是 NullTarget.

我怎样才能让所有物品都有机会被发送到 SplitPipeLineAction 区块?

当源块中有一个项目可用时,它会一次一个地提供给它的link;如果任何 link 没有收到该物品,则将该物品提供给下一个 link。没有区分 为什么 没有使用 link。块确实区分 "maybe later" 回复和 "no" 回复(Postponed vs Declined),但在任何一种情况下,下一个 link 将尝试一个 linked 块,可以现在就拿.

解决此问题的最佳选择是向空块 link 添加一个谓词,它是目标块 link.

的谓词的否定
Predicate<Document> predicate = document => !string.IsNullOrEmpty(document.NewId);

DocumentCreationTransformBlock.LinkTo(SplitPipelineActionBlock, 
    CommonLinkOptions, 
    predicate);

DocumentCreationTransformBlock.LinkTo(DataflowBlock.NullTarget<Document>(), 
    CommonLinkOptions,
    document => !predicate(document));

这样,当 SplitPipelineActionBlock 已满时,该项目将提供给 null link,由于谓词失败而被拒绝,该项目将保留在转换块输出缓冲区中,直到SplitPipelineActionBlock 有空间。