PublishSubject 与 Kotlin 协程(流程)

PublishSubject with Kotlin coroutines (Flow)

我使用了 PublishSubject,我正在向它发送消息,同时我也在监听结果。它工作完美,但现在我不确定如何使用 Kotlin 的协程(流或通道)做同样的事情。

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe { /* value received */ }
}

因为我需要 debounce 运算符,所以我真的想对流做同样的事情,所以我创建了一个通道,然后我尝试从该通道创建一个流并监听变化,但我没有得到任何结果.

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}

怎么了?

Flow 是冷异步流,就像 Observable.

All transformations on the flow, such as map and filter do not trigger flow collection or execution, only terminal operators (e.g. single) do trigger it.

onEach方法只是一个转换。因此,您应该将其替换为终端流运算符 collect。您也可以使用 BroadcastChannel 来获得更清晰的代码:

private val channel = BroadcastChannel<Boolean>(1)

suspend fun someMethod(b: Boolean) {
    channel.send(b)
}

suspend fun observe() {
  channel
    .asFlow()
    .debounce(500)
    .collect {
        // value received
    }
}

更新: 在提出问题时,有两个参数的 debounce 过载(如问题中所示)。没有了。但是现在有一个以毫秒(长)为单位的参数。

Kotlin 协程中的 ArrayBroadcastChannel 与 PublishSubject 最相似。

  1. 像 PublishSubject 一样,一个 ArrayBroadcastChannel 可以有多个 订阅者和所有活跃订阅者都会立即收到通知。
  2. 与 PublishSubject 一样,如果此时没有活动订阅者,推送到此频道的事件将丢失。

与 PublishSubject 不同,背压内置于协程通道中,这就是缓冲容量的来源。这个数字实际上取决于通道用于哪种用例。对于大多数正常用例,我只选择 10 个,这应该绰绰有余。如果你将事件推送到这个通道的速度比接收者消耗它的速度快,你可以提供更多的容量。

对于PublishProcessor/PublishRelay

应该是SharedFlow/MutableSharedFlow
private val _myFlow = MutableSharedFlow<Boolean>(
                          replay = 0,
                          extraBufferCapacity = 1, // you can increase      
                          BufferOverflow.DROP_OLDEST
)
val myFlow = _myFlow.asSharedFlow()


// ...
fun someMethod(b: Boolean) {
    _myFlow.tryEmit(b)
}

fun observe() {
    myFlow.debounce(500)
          .onEach {  }
          // flowOn(), catch{}
          .launchIn(coroutineScope)

}

StateFlow/MutableStateFlowBehaviorProcessor/BehaviorRelay

private val _myFlow = MutableStateFlow<Boolean>(false)
val myFlow = _myFlow.asStateFlow()

// ...
fun someMethod(b: Boolean) {
    _myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b)
}

fun observe() {
    myFlow.debounce(500)
          .onEach {  }
          // flowOn(), catch{}
          .launchIn(coroutineScope)

}

StateFlow 必须有初始值,如果你不想这样,这是解决方法:

private val _myFlow = MutableStateFlow<Boolean?>(null)
val myFlow = _myFlow.asStateFlow()
                    .filterNotNull()

MutableStateFlow 在设置新值时使用 .equals 比较,因此它不会一次又一次地发出相同的值(相对于使用引用比较的 distinctUntilChanged)。

所以MutableStateFlowBehaviorProcessor.distinctUntilChanged()。如果你想要精确的 BehaviorProcessor 行为,那么你可以使用这个:

private val _myFlow = MutableSharedFlow<Boolean>(
                              replay = 1, 
                              extraBufferCapacity = 0,
                              BufferOverflow.DROP_OLDEST
)

实际上 BroadcastChannel 已经过时了,Jetbrains 改变了他们使用 SharedFlows 的方法。这更简洁,更容易实施并解决了很多痛点。

基本上,你可以像这样实现同样的事情。

class BroadcastEventBus {
    private val _events = MutableSharedFlow<Event>()
    val events = _events.asSharedFlow() // read-only public view

    suspend fun postEvent(event: Event) {
        _events.emit(event) // suspends until subscribers receive it
    }
}

要了解更多信息,请查看 Roman 的 Medium 文章。

"Shared flows, broadcast channels" by Roman Elizarov