RxJava 的扫描在每个新订阅者上用 initialValue 覆盖当前值
RxJava's scan overrides the current value with initialValue on every new subscriber
使用此代码从 ViewModel
:
发出 states
private val stateCommandRelay: PublishRelay<StateCommand> by lazy {
PublishRelay.create<StateCommand>()
}
val states: Flowable<STATE> by lazy {
stateCommandRelay
.doOnNext { Log.d(className(), "----> ${it.javaClass.simpleName}") }
.scan(initialState()) { previous: STATE, command: StateCommand ->
Log.d(className(), "Reducing with command: ${command.javaClass.simpleName}")
reducer().reduce(previous, command)
}
.doOnNext { Log.d(className(), "STATE: $it") }
.toFlowable(BackpressureStrategy.LATEST)
.replayingShare()
}
这是在 Activity
/Fragment
中使用以下代码订阅的:
viewModel.states.subscribeBy(onNext = { render(it) })
它在接收 StateCommands
和发出新状态时运行良好,传播初始 state
而无需等待第一个 stateCommand。
重新订阅 states
时出现问题。 initialValue
重新发出,覆盖当前状态并基本上重置状态。
从日志中我可以看到没有 StateCommands
从 stateCommandRelay
传递,也没有调用 reducer.reduce()
方法,它只是发出这个新值并随后调用 reducer.reduce()
将重置状态为 previous
。
我错过了什么吗?我认为它只会在第一次订阅时被调用,replayingShare()
应该负责让它只发生一次。
试试这个:
.scan(initialState()) { previous: STATE, command: StateCommand ->
Log.d(className(), "Reducing with command: ${command.javaClass.simpleName}")
reducer().reduce(previous, command)
}
.replay(1)
.autoConnect(0)
.toFlowable(BackpressureStrategy.LATEST)
//.replayingShare()
使用此代码从 ViewModel
:
states
private val stateCommandRelay: PublishRelay<StateCommand> by lazy {
PublishRelay.create<StateCommand>()
}
val states: Flowable<STATE> by lazy {
stateCommandRelay
.doOnNext { Log.d(className(), "----> ${it.javaClass.simpleName}") }
.scan(initialState()) { previous: STATE, command: StateCommand ->
Log.d(className(), "Reducing with command: ${command.javaClass.simpleName}")
reducer().reduce(previous, command)
}
.doOnNext { Log.d(className(), "STATE: $it") }
.toFlowable(BackpressureStrategy.LATEST)
.replayingShare()
}
这是在 Activity
/Fragment
中使用以下代码订阅的:
viewModel.states.subscribeBy(onNext = { render(it) })
它在接收 StateCommands
和发出新状态时运行良好,传播初始 state
而无需等待第一个 stateCommand。
重新订阅 states
时出现问题。 initialValue
重新发出,覆盖当前状态并基本上重置状态。
从日志中我可以看到没有 StateCommands
从 stateCommandRelay
传递,也没有调用 reducer.reduce()
方法,它只是发出这个新值并随后调用 reducer.reduce()
将重置状态为 previous
。
我错过了什么吗?我认为它只会在第一次订阅时被调用,replayingShare()
应该负责让它只发生一次。
试试这个:
.scan(initialState()) { previous: STATE, command: StateCommand ->
Log.d(className(), "Reducing with command: ${command.javaClass.simpleName}")
reducer().reduce(previous, command)
}
.replay(1)
.autoConnect(0)
.toFlowable(BackpressureStrategy.LATEST)
//.replayingShare()