使用 RXJava 扫描运算符时确保顺序状态更新
Ensure sequential state update when using RXJava scan operator
我正在尝试使用 RXJava 实现 redux 状态更新模式
val subject=PublishSubject.create()
val subject1=PublishSubject.create()
// multiple threads posting
// on subject and subject1 here. Concurrently
subject.mergeWith(subject1)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
如您所见,我正在使用 scan
运算符来维护状态。
即使多个线程正在生成事件,我如何确保状态更新按顺序发生?
scan
运算符中是否有某种机制使事件在等待当前状态更新功能完成时站在某个队列中?
我做了什么:
我已经在 Android 环境中成功实现了这个模式。这真的很容易,因为如果你总是在
中进行状态更新
AndroidSchedulers.mainThread()
并使状态对象不可变,保证您可以进行原子和顺序状态更新。但是,如果您没有专门的状态更新调度程序会怎样?如果您不在 Android 怎么办?
我研究的内容:
我已经阅读了 scan
运算符的源代码并且没有
等待 "queue" 涉及。只是简单的状态更新和发射
我也看过 SerializedSubject 的源代码。确实有一个序列化排放的等待队列。但是如果我有两个主题会怎样?连载两者不代表不互相干扰
要强制在单线程上执行,您可以显式创建单线程调度程序来替换 AndroidSchedulers.mainThread()
:
val singleThreadScheduler = Schedulers.single()
即使事件是在其他线程上发出的,您也可以使用 observeOn
:
确保仅在您的单个线程上处理它们
subject.mergeWith(subject1)
.observeOn(singleThreadScheduler)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
observeOn
和 subscribeOn
之间的区别可能会让人非常困惑,记录线程 ID 有助于检查您期望的线程上的所有内容 运行。
我正在尝试使用 RXJava 实现 redux 状态更新模式
val subject=PublishSubject.create()
val subject1=PublishSubject.create()
// multiple threads posting
// on subject and subject1 here. Concurrently
subject.mergeWith(subject1)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
如您所见,我正在使用 scan
运算符来维护状态。
即使多个线程正在生成事件,我如何确保状态更新按顺序发生?
scan
运算符中是否有某种机制使事件在等待当前状态更新功能完成时站在某个队列中?
我做了什么:
我已经在 Android 环境中成功实现了这个模式。这真的很容易,因为如果你总是在
中进行状态更新AndroidSchedulers.mainThread()
并使状态对象不可变,保证您可以进行原子和顺序状态更新。但是,如果您没有专门的状态更新调度程序会怎样?如果您不在 Android 怎么办?
我研究的内容:
我已经阅读了
scan
运算符的源代码并且没有 等待 "queue" 涉及。只是简单的状态更新和发射我也看过 SerializedSubject 的源代码。确实有一个序列化排放的等待队列。但是如果我有两个主题会怎样?连载两者不代表不互相干扰
要强制在单线程上执行,您可以显式创建单线程调度程序来替换 AndroidSchedulers.mainThread()
:
val singleThreadScheduler = Schedulers.single()
即使事件是在其他线程上发出的,您也可以使用 observeOn
:
subject.mergeWith(subject1)
.observeOn(singleThreadScheduler)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
observeOn
和 subscribeOn
之间的区别可能会让人非常困惑,记录线程 ID 有助于检查您期望的线程上的所有内容 运行。