RxJava2 - 使用 PublishSubject 发射项目

RxJava2 - Emitting items using PublishSubject

我有一个场景

subject1: PublishSubjectsubject2:BehaviorSubject.

首先,我为 subject1 发射单个物品,然后为 subject2 发射物品,但紧接着我还想为 subject1 发射不同的物品。

fun emittingItems() {
    subject1.onNext(functionA1)
    subject2.onNext(functionB)
    if (something) subject1.onNext(functionA2)
}

发生的情况是,我收到了以下顺序的项目:functionA1functionA2functionB

为什么我会出现这种行为?我如何按以下顺序发出项目:functionA1functionBfunctionA2.

正在订阅主题:

val disposable = viewModel.subject1
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::someFunction())
disposables.add(disposable)

使用 observeOn(AndroidSchedulers.mainThread()) 您可以安排主线程上的事件传播。调度本身是顺序的,而每个调度的 Runnable 可能会处理添加到队列中的多个元素。

这是一种竞争条件,在主线程本身调用 emittingItems() 时肯定会出现,在从任何其他线程调用它时也可能出现。

但是由于您正在处理两个不同的异步流,因此您不能指望在两个不同的观察者中进行任何顺序观察。

您可以通过将两个来源合并为一个流来实现给定的目标:

Observable.merge(subject1, subject2)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subject);