PublishSubject 与 Kotlin 协程(流程)
PublishSubject with Kotlin coroutines (Flow)
我使用了 PublishSubject,我正在向它发送消息,同时我也在监听结果。它工作完美,但现在我不确定如何使用 Kotlin 的协程(流或通道)做同样的事情。
private val subject = PublishProcessor.create<Boolean>>()
...
fun someMethod(b: Boolean) {
subject.onNext(b)
}
fun observe() {
subject.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { /* value received */ }
}
因为我需要 debounce 运算符,所以我真的想对流做同样的事情,所以我创建了一个通道,然后我尝试从该通道创建一个流并监听变化,但我没有得到任何结果.
private val channel = Channel<Boolean>()
...
fun someMethod(b: Boolean) {
channel.send(b)
}
fun observe() {
flow {
channel.consumeEach { value ->
emit(value)
}
}.debounce(500, TimeUnit.MILLISECONDS)
.onEach {
// value received
}
}
怎么了?
Flow
是冷异步流,就像 Observable
.
All transformations on the flow, such as map
and filter
do not trigger flow collection or execution, only terminal operators (e.g. single
) do trigger it.
onEach
方法只是一个转换。因此,您应该将其替换为终端流运算符 collect
。您也可以使用 BroadcastChannel
来获得更清晰的代码:
private val channel = BroadcastChannel<Boolean>(1)
suspend fun someMethod(b: Boolean) {
channel.send(b)
}
suspend fun observe() {
channel
.asFlow()
.debounce(500)
.collect {
// value received
}
}
更新: 在提出问题时,有两个参数的 debounce
过载(如问题中所示)。没有了。但是现在有一个以毫秒(长)为单位的参数。
Kotlin 协程中的 ArrayBroadcastChannel 与 PublishSubject 最相似。
- 像 PublishSubject 一样,一个 ArrayBroadcastChannel 可以有多个
订阅者和所有活跃订阅者都会立即收到通知。
- 与 PublishSubject 一样,如果此时没有活动订阅者,推送到此频道的事件将丢失。
与 PublishSubject 不同,背压内置于协程通道中,这就是缓冲容量的来源。这个数字实际上取决于通道用于哪种用例。对于大多数正常用例,我只选择 10 个,这应该绰绰有余。如果你将事件推送到这个通道的速度比接收者消耗它的速度快,你可以提供更多的容量。
对于PublishProcessor/PublishRelay
应该是SharedFlow/MutableSharedFlow
private val _myFlow = MutableSharedFlow<Boolean>(
replay = 0,
extraBufferCapacity = 1, // you can increase
BufferOverflow.DROP_OLDEST
)
val myFlow = _myFlow.asSharedFlow()
// ...
fun someMethod(b: Boolean) {
_myFlow.tryEmit(b)
}
fun observe() {
myFlow.debounce(500)
.onEach { }
// flowOn(), catch{}
.launchIn(coroutineScope)
}
和 StateFlow/MutableStateFlow
为 BehaviorProcessor/BehaviorRelay
。
private val _myFlow = MutableStateFlow<Boolean>(false)
val myFlow = _myFlow.asStateFlow()
// ...
fun someMethod(b: Boolean) {
_myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b)
}
fun observe() {
myFlow.debounce(500)
.onEach { }
// flowOn(), catch{}
.launchIn(coroutineScope)
}
StateFlow
必须有初始值,如果你不想这样,这是解决方法:
private val _myFlow = MutableStateFlow<Boolean?>(null)
val myFlow = _myFlow.asStateFlow()
.filterNotNull()
MutableStateFlow
在设置新值时使用 .equals
比较,因此它不会一次又一次地发出相同的值(相对于使用引用比较的 distinctUntilChanged
)。
所以MutableStateFlow
≈BehaviorProcessor.distinctUntilChanged()
。如果你想要精确的 BehaviorProcessor
行为,那么你可以使用这个:
private val _myFlow = MutableSharedFlow<Boolean>(
replay = 1,
extraBufferCapacity = 0,
BufferOverflow.DROP_OLDEST
)
实际上 BroadcastChannel
已经过时了,Jetbrains 改变了他们使用 SharedFlows
的方法。这更简洁,更容易实施并解决了很多痛点。
基本上,你可以像这样实现同样的事情。
class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view
suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}
要了解更多信息,请查看 Roman 的 Medium 文章。
我使用了 PublishSubject,我正在向它发送消息,同时我也在监听结果。它工作完美,但现在我不确定如何使用 Kotlin 的协程(流或通道)做同样的事情。
private val subject = PublishProcessor.create<Boolean>>()
...
fun someMethod(b: Boolean) {
subject.onNext(b)
}
fun observe() {
subject.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { /* value received */ }
}
因为我需要 debounce 运算符,所以我真的想对流做同样的事情,所以我创建了一个通道,然后我尝试从该通道创建一个流并监听变化,但我没有得到任何结果.
private val channel = Channel<Boolean>()
...
fun someMethod(b: Boolean) {
channel.send(b)
}
fun observe() {
flow {
channel.consumeEach { value ->
emit(value)
}
}.debounce(500, TimeUnit.MILLISECONDS)
.onEach {
// value received
}
}
怎么了?
Flow
是冷异步流,就像 Observable
.
All transformations on the flow, such as
map
andfilter
do not trigger flow collection or execution, only terminal operators (e.g.single
) do trigger it.
onEach
方法只是一个转换。因此,您应该将其替换为终端流运算符 collect
。您也可以使用 BroadcastChannel
来获得更清晰的代码:
private val channel = BroadcastChannel<Boolean>(1)
suspend fun someMethod(b: Boolean) {
channel.send(b)
}
suspend fun observe() {
channel
.asFlow()
.debounce(500)
.collect {
// value received
}
}
更新: 在提出问题时,有两个参数的 debounce
过载(如问题中所示)。没有了。但是现在有一个以毫秒(长)为单位的参数。
Kotlin 协程中的 ArrayBroadcastChannel 与 PublishSubject 最相似。
- 像 PublishSubject 一样,一个 ArrayBroadcastChannel 可以有多个 订阅者和所有活跃订阅者都会立即收到通知。
- 与 PublishSubject 一样,如果此时没有活动订阅者,推送到此频道的事件将丢失。
与 PublishSubject 不同,背压内置于协程通道中,这就是缓冲容量的来源。这个数字实际上取决于通道用于哪种用例。对于大多数正常用例,我只选择 10 个,这应该绰绰有余。如果你将事件推送到这个通道的速度比接收者消耗它的速度快,你可以提供更多的容量。
对于PublishProcessor/PublishRelay
SharedFlow/MutableSharedFlow
private val _myFlow = MutableSharedFlow<Boolean>(
replay = 0,
extraBufferCapacity = 1, // you can increase
BufferOverflow.DROP_OLDEST
)
val myFlow = _myFlow.asSharedFlow()
// ...
fun someMethod(b: Boolean) {
_myFlow.tryEmit(b)
}
fun observe() {
myFlow.debounce(500)
.onEach { }
// flowOn(), catch{}
.launchIn(coroutineScope)
}
和 StateFlow/MutableStateFlow
为 BehaviorProcessor/BehaviorRelay
。
private val _myFlow = MutableStateFlow<Boolean>(false)
val myFlow = _myFlow.asStateFlow()
// ...
fun someMethod(b: Boolean) {
_myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b)
}
fun observe() {
myFlow.debounce(500)
.onEach { }
// flowOn(), catch{}
.launchIn(coroutineScope)
}
StateFlow
必须有初始值,如果你不想这样,这是解决方法:
private val _myFlow = MutableStateFlow<Boolean?>(null)
val myFlow = _myFlow.asStateFlow()
.filterNotNull()
MutableStateFlow
在设置新值时使用 .equals
比较,因此它不会一次又一次地发出相同的值(相对于使用引用比较的 distinctUntilChanged
)。
所以MutableStateFlow
≈BehaviorProcessor.distinctUntilChanged()
。如果你想要精确的 BehaviorProcessor
行为,那么你可以使用这个:
private val _myFlow = MutableSharedFlow<Boolean>(
replay = 1,
extraBufferCapacity = 0,
BufferOverflow.DROP_OLDEST
)
实际上 BroadcastChannel
已经过时了,Jetbrains 改变了他们使用 SharedFlows
的方法。这更简洁,更容易实施并解决了很多痛点。
基本上,你可以像这样实现同样的事情。
class BroadcastEventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow() // read-only public view
suspend fun postEvent(event: Event) {
_events.emit(event) // suspends until subscribers receive it
}
}
要了解更多信息,请查看 Roman 的 Medium 文章。