BufferBlock 在 OutputAvailableAsync 上被阻塞

BufferBlock getting blocked on OutputAvailableAsync

我刚开始使用 TPL。我已经能够在 BufferBlock 上实现一个简单的 post/receive 事务,但是当我尝试异步时,我被挂了。

这是我正在尝试的简化版本。

声明消息缓冲区(使其成为全局缓冲区)

Dim msgBuffer As New BufferBlock(Of String)

post 消息的简单函数,消息只是目录中的文件名列表

Private Sub PostMessagesToBuffer()

    filesList = Directory.GetFiles(txtBoxSrcFilesDir.Text, fileFilter).ToList
    For Each file In filesList
        msgBuffer.Post(file)
    Next
    msgBuffer.Complete()
End Sub

我创建了这个函数来处理消息:

Private Async Function ProcessMessagesAsync() As Task(Of Integer)

    Dim msgsProcessed = 0

    While Await msgBuffer.OutputAvailableAsync
        Dim msg = msgBuffer.Receive
        Console.WriteLine(msg)
        msgsProcessed += 1
    End While

    Return msgsProcessed

End Function

那么就这一套调用运行就全了。

Dim msgProcessor = ProcessMessagesAsync()
PostMessagesToBuffer()
msgProcessor.Wait()
Console.writeLine("Processed " & msgProcessor.Result & " Messages.")

我可以调试并看到消息被添加到缓冲区,但 "While Await" 永远不会收到消息在队列中可用的信号。它只是坐在那里,永远不会进入循环来处理消息。我在这里错过了一些相当简单的东西吗?

您正在混合同步和异步代码。 blocking on async code 时死锁很常见。而不是 Wait:

msgProcessor.Wait()

...Await更安全:

Await msgProcessor

还有DataflowBlock.Receive is a blocking method. The proper method to use after awaiting the DataflowBlock.OutputAvailableAsync is the method DataflowBlock.TryReceive.

我还应该指出,手动检索 DataflowBlock 的元素是可行的,但并不常见。通常数据流管道中的最后一个块是 ActionBlock,它不会生成输出。