Kotlin 等待 Channel.isClosedForReceive

Kotlin wait for Channel.isClosedForReceive

调用 Channel.close() 后等待 Channel.isClosedForReceive 为真的最佳方法是什么?

我正在按顺序处理消息,并希望 return 调用 Channel.close() 后处理的最大消息数。但是,如果我在调用 close() 后只获取最大处理消息,则在 "close token" 被消耗之前通道中可能有一些消息导致实际最大处理消息大于值 returned.

根据 Channel.close() 的文档,我认为 Channel.isClosedForReceive 是我应该等待的。但我期待一些暂停功能等待,而不是必须轮询其状态。

/**
 * Immediately after invocation of this function
 * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
 * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
 * are received.
 **/

让我们假设您在某种角色中处理所有消息:

val channel = actor<Message> { ... }

最好的方法是了解此参与者处理的最后一条消息是创建一个明确的反向通道来传达此信息。因为我们只需要传递一个值,所以我们使用 CompletableDeferred 这个目的:

val lastProcessed = CompletableDeferred<Message?>() 

现在,您可以按以下方式编写 message-processing 演员:

val actor = actor<Message> {
    var last: Message? = null
    try {
        for (msg in channel) {
            // process message
            last = msg
        }
    } finally {
        // report the last processed message via back channel
        lastProcessed.complete(last)
    }
}

这里是关闭通道并学习最后处理的消息的代码:

actor.close()
val last = lastProcessed.await() // receive last processed message