Dataflowblock 停止更新 UI 但仍在运行操作?

Dataflowblock stops updating UI but still runs the action?

这个问题真的很难调试,不是每次都会发生(不会在短时间内发生,这样我就可以轻松调试代码)而且看起来没有人遇到过类似的问题吗? (我已经用谷歌搜索了几个小时,但没有找到与此问题相关的任何内容)。

简而言之,我的数据流网络在某些时候工作正常,直到我发现终端块(更新 UI)似乎停止工作([=71 上没有更新新数据) =]) 而所有向上的数据流块仍然工作正常。所以其他块和这里的 ui 块之间好像有一些断开连接。

这是我详细的数据流网络,在我解释更多问题之前,让我们先看看:

//the network graph first
[raw data block] 
-> [switching block] -> [data counting block]
                     -> [processing block] -> [ok result block] -> [completion monitoring]
                                           -> [not ok result block] -> [completion monitoring]

//in the UI code behind where I can consume the network and plug-in some other blocks for updating
//like this:
     [ok result block] -> [ok result counting block]
     [not ok result block] -> [other ui updating]

[ok result block] 是一个 BroadcastBlock,它将结果推送到 [ok result counting block]。我在这里部分描述的问题是这个 [ok result counting block] 似乎与 [ok result block].

断开了
var options = new DataflowBlockOptions { EnsureOrdered = false };
var execOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 80 };

//[raw data block]
var rawDataBlock = new BufferBlock<Input>(options);

//[switching block]
var switchingBlock = new TransformManyBlock<Input,Input>(e => new[] {e,null});

//[data counting block]
var dataCountingBlock = new BroadcastBlock<Input>(null);

//[processing block]
var processingBlock = new TransformBlock<Input,int>(async e => {
    //call another api to compute the result
    var result = await …;
    //rollback the input for later processing (some kind of retry)
    if(result < 0){
       //per my logging, there is only one call dropping 
       //in this case
       Task.Run(rollback);
    }
    //local function to rollback
    async Task rollback(){
      await rawDataBlock.SendAsync(e).ConfigureAwait(false);
    }
    return result;
}, execOptions);

//[ok result block]
var okResultBlock = new BroadcastBlock<int>(null, options);

//[not ok result block]
var notOkResultBlock = new BroadcastBlock<int>(null, options);

//[completion monitoring]
var completionMonitoringBlock = new ActionBlock<int>(e => {
     if(rawDataBlock.Completion.IsCompleted && processingBlock.InputCount == 0){
          processingBlock.Complete();
     }
}, execOptions);

//connect the blocks to build the network
rawDataBlock.LinkTo(switchingBlock);
switchingBlock.LinkTo(processingBlock, e => e != null);
switchingBlock.LinkTo(dataCountingBlock, e => e == null);

processingBlock.LinkTo(okResultBlock, e => e >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e < 9);

okResultBlock.LinkTo(completionMonitoringBlock);
notOkResultBlock.LinkTo(completionMonitoringBlock);

在后面的 UI 代码中,我插入了一些其他 UI 块来更新信息。我在这里使用 WPF 但我认为这并不重要:

var uiBlockOptions = new ExecutionDataflowBlockOptions {
     TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
};
dataCountingBlock.LinkTo(new ActionBlock<int>(e => {
      //these are properties in the VM class, which is bound to the UI (xaml view)
      RawInputCount++;
}, uiBlockOptions));

okResultBlock.LinkTo(new ActionBlock<int>(e => {
      //these are properties in the VM class, which is bound to the UI (xaml view)
      ProcessedCount++;
      OkResultCount++;
}, uiBlockOptions));

notOkResultBlock.LinkTo(new ActionBlock<int>(e => {
      //these are properties in the VM class, which is bound to the UI (xaml view)
      ProcessedCount++;
      PendingCount = processingBlock.InputCount;
}, uiBlockOptions));

我确实有代码监控块的完成状态:rawDataBlockprocessingBlockokResultBlocknotOkResultBlock。 我在 processingBlock 中还有其他日志记录代码来帮助诊断。

所以正如我所说,经过相当长的时间(大约 1 小时处理了大约 60 万个项目,实际上这个数字没有说明问题,它可能是随机的),网络似乎仍然 运行很好,除了一些计数(好的结果,不好的结果)没有更新,就好像 okResultBlocknotOkResultBlockprocessingBlock 断开或者它们与 [=71= 断开一样] 块(更新 UI)。我确保 processingBlock 仍在工作(未记录异常并且结果仍写入文件),dataCountingBlock 仍在正常工作(在 UI 上更新了新计数),所有块 processingBlockokResultBlocknotOkResultBlock 均未完成(它们的完成是 .ContinueWith 一项注销状态但未记录任何内容的任务)。

所以它真的卡在那里了。我不知道为什么它会那样停止工作。这只有在我们使用像 TPL Dataflow 这样的黑盒库时才会发生。我知道您可能也很难诊断、想象和思考各种可能性。我只是在这里询问解决此问题的建议以及您的任何共享经验(关于类似问题)以及可能导致 TPL Dataflow

中此类问题的一些猜测

更新:

我已经成功重现了一次错误,之前我已经准备了一些代码来记录一些信息以帮助调试。问题现在归结为这一点:processingBlock 实际上并没有 push/post/send 任何消息到所有链接块(包括 okResultBlocknotOkResultBlock)甚至一个新的链接到它的块(前缀为 DataflowLinkOptionsAppend 为 false)无法接收任何消息(结果)。正如我所说,processBlock 似乎仍然可以正常工作(它的 Action 执行 运行 内部代码并正常生成结果日志记录)。所以这还是一个很奇怪的问题。

简而言之,现在的问题是为什么 processBlock 不能 send/post 将其消息发送到所有其他链接块?是否有任何可能的原因导致这种情况发生?如何知道块是否链接成功(调用.LinkTo后)?

这实际上是我的错,processingBlock 实际上被阻止了,但它被正确地阻止了并且以一种好的方式(通过设计)。

processingBlock 被 2 个因素阻止:

  • EnsureOrderedtrue(默认情况下),因此输出始终按处理顺序排队。
  • 至少有一个输出结果无法被推出(到其他区块)。

因此,如果一个输出结果不能被推出,它将成为阻塞项,因为所有输出结果都按处理顺序排队。所有后处理的输出结果将简单地被无法推出的第一个输出结果阻塞(排队)。

在我这里无法推出的特殊输出结果是null结果。该空结果只能由某些错误(异常处理)产生。所以我有 2 个块 okResultBlocknotOkResultBlock 链接到 processingBlock。但是这两个块都被过滤,只让 non-null 结果通过。抱歉,我的问题没有反映出我所拥有的关于输出类型的确切代码。在问题中它只是一个简单的 int 但实际上它是一个 class (可为空),实际的链接代码如下所示:

processingBlock.LinkTo(okResultBlock, e => e != null && e.Point >= 9);
processingBlock.LinkTo(notOkResultBlock, e => e != null && e.Point < 9);

因此 null 输出结果将被阻止并因此阻止所有后处理结果(因为选项 EnsureOrdered 默认为 true)。

为了解决这个问题,我只是简单地将 EnsureOrdered 设置为 false (虽然这不是避免阻塞所必需的,但对我来说这很好)并添加一个块来消耗null 输出结果(这是帮助避免阻塞最重要的):

processingBlock.LinkTo(DataflowBlock.NullTarget<Output>(), e => e == null);