C# TPL 数据流 ReceiveAsync() 未完成但任务显示已完成
C# TPL Dataflow ReceiveAsync() is not completed but task shows completed
我创建了如下所示的 TPL 数据流管道
BufferBlock --> TransformBlock --> TransformBlock--> TransformBlock --> BufferBlock
因为所有数据块都用ExecutionDataflowBlockOptions.EnsureOrdered = true
初始化。
所有块都按上述顺序正确链接 DataflowLinkOptions.PropagateCompletion = true
。
所以我按如下所示的方式使用数据流。
public async Task Process(IMessage message)
{
await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
}
然后调用此方法,然后如下所示进行处理
await Process(message).ContinueWith(task => HandleProcessedMessage(message,task));
在其中一个转换块中设置了 ResponseMessage
(字符串)属性 的值(在 IMessage
内)。
这发生在 SignalR 环境中。一旦我们一次处理 0 到 100 条消息,一切正常,一旦我们处理 499 条消息,即使执行到达 HandleProcessedMessage
方法,ResponseMessage
属性 为空。如果我在 HandleProcessedMessage
中添加一个延迟 Task.Delay(500).Wait()
,那么在那之后,属性 就会有它的值。有没有我遗漏的东西或者与线程相关的东西?
我还验证了该值始终是从转换块设置的,并且没有发生异常。所有请求都在管道内按顺序处理。
如果有人正在寻找解决方案。我认为当多个线程一起工作时会发生错误,我们以一种方式解决了它,而不是返回任务进程,我们将方法更改为。
public async Task<IMessage> Process(IMessage message)
{
await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
message = await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
return message;
}
调用此方法已更改为。
IMessage processedMessage = await Process(message).ConfigureAwait(false);
HandleProcessedMessage(processedMessage);
这解决了即使在管道之后数据仍为空的问题。管道中不需要更改。感谢大家的回复。
我创建了如下所示的 TPL 数据流管道
BufferBlock --> TransformBlock --> TransformBlock--> TransformBlock --> BufferBlock
因为所有数据块都用ExecutionDataflowBlockOptions.EnsureOrdered = true
初始化。
所有块都按上述顺序正确链接 DataflowLinkOptions.PropagateCompletion = true
。
所以我按如下所示的方式使用数据流。
public async Task Process(IMessage message)
{
await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
}
然后调用此方法,然后如下所示进行处理
await Process(message).ContinueWith(task => HandleProcessedMessage(message,task));
在其中一个转换块中设置了 ResponseMessage
(字符串)属性 的值(在 IMessage
内)。
这发生在 SignalR 环境中。一旦我们一次处理 0 到 100 条消息,一切正常,一旦我们处理 499 条消息,即使执行到达 HandleProcessedMessage
方法,ResponseMessage
属性 为空。如果我在 HandleProcessedMessage
中添加一个延迟 Task.Delay(500).Wait()
,那么在那之后,属性 就会有它的值。有没有我遗漏的东西或者与线程相关的东西?
我还验证了该值始终是从转换块设置的,并且没有发生异常。所有请求都在管道内按顺序处理。
如果有人正在寻找解决方案。我认为当多个线程一起工作时会发生错误,我们以一种方式解决了它,而不是返回任务进程,我们将方法更改为。
public async Task<IMessage> Process(IMessage message)
{
await (ITargetBlock<IMessage>)firstBlock.SendAsync(message);
message = await (ISourceBlock<IMessage>)lastBlock.ReceiveAsync();
return message;
}
调用此方法已更改为。
IMessage processedMessage = await Process(message).ConfigureAwait(false);
HandleProcessedMessage(processedMessage);
这解决了即使在管道之后数据仍为空的问题。管道中不需要更改。感谢大家的回复。