我怎样才能获得以前发布的 Kotlin Flow?

How can I get a previous emission Kotlin Flow?

让我用一个简单的图像来说明我想要得到的东西:

我不想使用 SharedFlowreplayCache 来实现这一点,因为如果一个新的观察者观察到 SharedFlow,它将得到 2 个发射而不是一个最新的发射.

或者如果我用代码写:

val sharedFlow = MutableSharedFlow(replay = 1)
val theFlowThatIWant = sharedFlow.unknownOperator { … }
sharedFlow.emit(1)
sharedFlow.emit(2)

sharedFlow.collect {
   println(it)
}

Expected output:
2

theFlowThatIWant.collect {
   println(it)
}

Expected output:
1

我们可以自己创建这样的算子。我们可以将它推广到更多的项目,而不仅仅是最后一个项目,并使用循环缓冲区来保留推迟的项目:

suspend fun main() {
    val f = flow {
        repeat(5) {
            println("Emitting $it")
            emit(it)
            delay(1000)
        }
    }

    f.postponeLast()
        .collect { println("Collecting $it") }
}

fun <T> Flow<T>.postponeLast(count: Int = 1): Flow<T> = flow {
    val buffer = ArrayDeque<T>(count)
    collect {
        if (buffer.size == count) {
            emit(buffer.removeFirst())
        }
        buffer.addLast(it)
    }
}

请注意,此解决方案从不发出推迟的项目。如果您想在最后发出它们,只需在 collect { }:

之后添加
while (buffer.isNotEmpty()) {
    emit(buffer.removeFirst())
}