使用 RXJava 扫描运算符时确保顺序状态更新

Ensure sequential state update when using RXJava scan operator

我正在尝试使用 RXJava 实现 redux 状态更新模式

val subject=PublishSubject.create()
val subject1=PublishSubject.create()

// multiple threads posting 
// on subject and subject1 here. Concurrently 


subject.mergeWith(subject1)
       .scan(
             getInitState(),
             {state, event ->
               // state update here 
             }
         )
        .subscribe({state ->
          // use state here
        })

如您所见,我正在使用 scan 运算符来维护状态。

即使多个线程正在生成事件,我如何确保状态更新按顺序发生?

scan 运算符中是否有某种机制使事件在等待当前状态更新功能完成时站在某个队列中?

我做了什么:

我已经在 Android 环境中成功实现了这个模式。这真的很容易,因为如果你总是在

中进行状态更新

AndroidSchedulers.mainThread()

并使状态对象不可变,保证您可以进行原子和顺序状态更新。但是,如果您没有专门的状态更新调度程序会怎样?如果您不在 Android 怎么办?

我研究的内容:

要强制在单线程上执行,您可以显式创建单线程调度程序来替换 AndroidSchedulers.mainThread()

val singleThreadScheduler = Schedulers.single()

即使事件是在其他线程上发出的,您也可以使用 observeOn:

确保仅在您的单个线程上处理它们
subject.mergeWith(subject1)
   .observeOn(singleThreadScheduler)
   .scan(
         getInitState(),
         {state, event ->
           // state update here 
         }
     )
    .subscribe({state ->
      // use state here
    })  

observeOnsubscribeOn 之间的区别可能会让人非常困惑,记录线程 ID 有助于检查您期望的线程上的所有内容 运行。

http://reactivex.io/documentation/scheduler.html