只有 Kotlin 协程的数据流?
Streams of data with only Kotlin Coroutines?
使用 RxJava 我已经习惯了我的存储库返回数据的 Observables,只要有潜在的变化就会自动更新。我通过简单地在我的存储库中有一个主题来实现这一点,该主题会收到相关更改信息的通知,并且像 getAll()
这样的可观察对象会消失。
举个例子,把这段伪代码当作片段:
fun getAll(): Observable<List<Model> {
subject
.filter { isChangeRelevant(it) }
.startWith(initialChangeEvent)
.map { queryAll() }
}
我很好奇如何以及是否可以仅使用协程来实现同样的事情?
您可以使用 Kotlin 协程 Channel
s.
如果您只想像流一样发出您的值(这样您就可以 for-each
离开它)您可以使用 produce
来创建它们(returns ReceiveChannel
):
fun test(): ReceiveChannel<Int>{
return produce {
send(1)
send(5)
send(100)
}
}
您可以对 test()
的值使用 for-each(或 consumeEach
)来接收其值。
如果您希望您的频道与 RxJava 的 PublishSubject 完全一样,您可以使用 ConflatedBroadCastChannel,并向其发出值:
val broadCastChannel = ConflatedBroadcastChannel<Int>()
您可以使用 broadCastChannel.offer(value)
将值发送到通道。
要从通道接收值,您可以使用简单的 for-each 循环:
for (i in broadCastChannel.openSubscription()) {
//your values
}
使用 RxJava 我已经习惯了我的存储库返回数据的 Observables,只要有潜在的变化就会自动更新。我通过简单地在我的存储库中有一个主题来实现这一点,该主题会收到相关更改信息的通知,并且像 getAll()
这样的可观察对象会消失。
举个例子,把这段伪代码当作片段:
fun getAll(): Observable<List<Model> {
subject
.filter { isChangeRelevant(it) }
.startWith(initialChangeEvent)
.map { queryAll() }
}
我很好奇如何以及是否可以仅使用协程来实现同样的事情?
您可以使用 Kotlin 协程 Channel
s.
如果您只想像流一样发出您的值(这样您就可以 for-each
离开它)您可以使用 produce
来创建它们(returns ReceiveChannel
):
fun test(): ReceiveChannel<Int>{
return produce {
send(1)
send(5)
send(100)
}
}
您可以对 test()
的值使用 for-each(或 consumeEach
)来接收其值。
如果您希望您的频道与 RxJava 的 PublishSubject 完全一样,您可以使用 ConflatedBroadCastChannel,并向其发出值:
val broadCastChannel = ConflatedBroadcastChannel<Int>()
您可以使用 broadCastChannel.offer(value)
将值发送到通道。
要从通道接收值,您可以使用简单的 for-each 循环:
for (i in broadCastChannel.openSubscription()) {
//your values
}