只有 Kotlin 协程的数据流?

Streams of data with only Kotlin Coroutines?

使用 RxJava 我已经习惯了我的存储库返回数据的 Observables,只要有潜在的变化就会自动更新。我通过简单地在我的存储库中有一个主题来实现这一点,该主题会收到相关更改信息的通知,并且像 getAll() 这样的可观察对象会消失。

举个例子,把这段伪代码当作片段:

fun getAll(): Observable<List<Model> {
    subject
        .filter { isChangeRelevant(it) }
        .startWith(initialChangeEvent)
        .map { queryAll() }
}

我很好奇如何以及是否可以仅使用协程来实现同样的事情?

您可以使用 Kotlin 协程 Channels.

如果您只想像流一样发出您的值(这样您就可以 for-each 离开它)您可以使用 produce 来创建它们(returns ReceiveChannel):

fun test(): ReceiveChannel<Int>{
        return produce {
            send(1)
            send(5)
            send(100)
        }
    }

您可以对 test() 的值使用 for-each(或 consumeEach)来接收其值。

如果您希望您的频道与 RxJava 的 PublishSubject 完全一样,您可以使用 ConflatedBroadCastChannel,并向其发出值:

val broadCastChannel = ConflatedBroadcastChannel<Int>()

您可以使用 broadCastChannel.offer(value) 将值发送到通道。

要从通道接收值,您可以使用简单的 for-each 循环:

for (i in broadCastChannel.openSubscription()) {
       //your values
}