C# TPL 数据流 ReceiveAsync() 未完成但任务显示已完成

C# TPL Dataflow ReceiveAsync() is not completed but task shows completed

我创建了如下所示的 TPL 数据流管道

BufferBlock --> TransformBlock --> TransformBlock--> TransformBlock --> BufferBlock

因为所有数据块都用ExecutionDataflowBlockOptions.EnsureOrdered = true初始化。

所有块都按上述顺序正确链接 DataflowLinkOptions.PropagateCompletion = true

所以我按如下所示的方式使用数据流。

public async Task Process(IMessage message)
{
      await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
      await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
}

然后调用此方法,然后如下所示进行处理

    await Process(message).ContinueWith(task => HandleProcessedMessage(message,task));

在其中一个转换块中设置了 ResponseMessage(字符串)属性 的值(在 IMessage 内)。

这发生在 SignalR 环境中。一旦我们一次处理 0 到 100 条消息,一切正常,一旦我们处理 499 条消息,即使执行到达 HandleProcessedMessage 方法,ResponseMessage 属性 为空。如果我在 HandleProcessedMessage 中添加一个延迟 Task.Delay(500).Wait(),那么在那之后,属性 就会有它的值。有没有我遗漏的东西或者与线程相关的东西?

我还验证了该值始终是从转换块设置的,并且没有发生异常。所有请求都在管道内按顺序处理。

如果有人正在寻找解决方案。我认为当多个线程一起工作时会发生错误,我们以一种方式解决了它,而不是返回任务进程,我们将方法更改为。

public async Task<IMessage> Process(IMessage message)
{
      await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
      message = await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
      return message;
}

调用此方法已更改为。

    IMessage processedMessage = await Process(message).ConfigureAwait(false);
    HandleProcessedMessage(processedMessage);

这解决了即使在管道之后数据仍为空的问题。管道中不需要更改。感谢大家的回复。