RxJava 的扫描在每个新订阅者上用 initialValue 覆盖当前值

RxJava's scan overrides the current value with initialValue on every new subscriber

使用此代码从 ViewModel:

发出 states
private val stateCommandRelay: PublishRelay<StateCommand> by lazy {
    PublishRelay.create<StateCommand>()
}

val states: Flowable<STATE>  by lazy {
    stateCommandRelay
            .doOnNext { Log.d(className(), "----> ${it.javaClass.simpleName}") }
            .scan(initialState()) { previous: STATE, command: StateCommand ->
                Log.d(className(), "Reducing with command: ${command.javaClass.simpleName}")
                reducer().reduce(previous, command)
            }
            .doOnNext { Log.d(className(), "STATE: $it") }
            .toFlowable(BackpressureStrategy.LATEST)
            .replayingShare()
}

这是在 Activity/Fragment 中使用以下代码订阅的:

viewModel.states.subscribeBy(onNext = { render(it) })

它在接收 StateCommands 和发出新状态时运行良好,传播初始 state 而无需等待第一个 stateCommand。

重新订阅 states 时出现问题。 initialValue 重新发出,覆盖当前状态并基本上重置状态。

从日志中我可以看到没有 StateCommandsstateCommandRelay 传递,也没有调用 reducer.reduce() 方法,它只是发出这个新值并随后调用 reducer.reduce() 将重置状态为 previous

我错过了什么吗?我认为它只会在第一次订阅时被调用,replayingShare() 应该负责让它只发生一次。

试试这个:

        .scan(initialState()) { previous: STATE, command: StateCommand ->
            Log.d(className(), "Reducing with command: ${command.javaClass.simpleName}")
            reducer().reduce(previous, command)
        }
        .replay(1)
        .autoConnect(0)
        .toFlowable(BackpressureStrategy.LATEST)
        //.replayingShare()