RxJava2 - 使用 PublishSubject 发射项目
RxJava2 - Emitting items using PublishSubject
我有一个场景
subject1: PublishSubject
和 subject2:BehaviorSubject
.
首先,我为 subject1
发射单个物品,然后为 subject2
发射物品,但紧接着我还想为 subject1
发射不同的物品。
fun emittingItems() {
subject1.onNext(functionA1)
subject2.onNext(functionB)
if (something) subject1.onNext(functionA2)
}
发生的情况是,我收到了以下顺序的项目:functionA1
、functionA2
、functionB
。
为什么我会出现这种行为?我如何按以下顺序发出项目:functionA1
、functionB
、functionA2
.
正在订阅主题:
val disposable = viewModel.subject1
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::someFunction())
disposables.add(disposable)
使用 observeOn(AndroidSchedulers.mainThread())
您可以安排主线程上的事件传播。调度本身是顺序的,而每个调度的 Runnable
可能会处理添加到队列中的多个元素。
这是一种竞争条件,在主线程本身调用 emittingItems()
时肯定会出现,在从任何其他线程调用它时也可能出现。
但是由于您正在处理两个不同的异步流,因此您不能指望在两个不同的观察者中进行任何顺序观察。
您可以通过将两个来源合并为一个流来实现给定的目标:
Observable.merge(subject1, subject2)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subject);
我有一个场景
subject1: PublishSubject
和 subject2:BehaviorSubject
.
首先,我为 subject1
发射单个物品,然后为 subject2
发射物品,但紧接着我还想为 subject1
发射不同的物品。
fun emittingItems() {
subject1.onNext(functionA1)
subject2.onNext(functionB)
if (something) subject1.onNext(functionA2)
}
发生的情况是,我收到了以下顺序的项目:functionA1
、functionA2
、functionB
。
为什么我会出现这种行为?我如何按以下顺序发出项目:functionA1
、functionB
、functionA2
.
正在订阅主题:
val disposable = viewModel.subject1
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::someFunction())
disposables.add(disposable)
使用 observeOn(AndroidSchedulers.mainThread())
您可以安排主线程上的事件传播。调度本身是顺序的,而每个调度的 Runnable
可能会处理添加到队列中的多个元素。
这是一种竞争条件,在主线程本身调用 emittingItems()
时肯定会出现,在从任何其他线程调用它时也可能出现。
但是由于您正在处理两个不同的异步流,因此您不能指望在两个不同的观察者中进行任何顺序观察。
您可以通过将两个来源合并为一个流来实现给定的目标:
Observable.merge(subject1, subject2)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subject);