单个 Observable 是否按顺序通知其所有观察者?

Does a single Observable notify sequentially to all its Observers?

我根据 PublishSubject 编写了 ObservableList。每次客户端向此列表添加一个元素时,观察者都会通过主题上的 onNext 方法得到通知。

我编写了两个示例观察者并将它们订阅到 ObservableList。我注意到通知是顺序的并且是阻塞的。因此,如果我订阅了这两个观察者,则会发生以下情况:

我真的不在乎 id 调用是顺序的,但想了解为什么通知也被阻塞以及我怎样才能让它不被阻塞。

这是 ObservableList

的代码
package co.com.subjects.example;

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

public class ObservableList<T>{

    public String nombre;
    protected final List<T> list;
    protected final PublishSubject<T> onAdd;

    public ObservableList(String nombre) {
        this.list = new ArrayList<T>();
        this.onAdd = PublishSubject.create();
        this.nombre = nombre;
    }

    public void add(T value) {
        list.add(value);
        onAdd.onNext(value);
    }

    public Observable<T> getObservable() {
        return onAdd;
    }
}

您可以通过将观察者包装在一个观察者中,并在 ExecutorService 中通知它来使其成为非阻塞。

显然,将任务添加到另一个池是相当昂贵的,所以我只会在您知道这需要一段时间时才这样做。另请注意,除非您小心,否则以这种方式发布的事件可能会乱序。一个简单的解决方案是为每个侦听器设置一个单线程执行器。

want to understand why the notification is blocking

RxJava 的实现假设执行一个 Observer 的 onNext 方法总是快速且便宜的,所以 PublishSubject 只是一个接一个地调用它的 Observers 的所有 onNext 方法,没有引入任何并发。

as well and how can I make it not to be blocking

在订阅您的观察者之前,您可以插入 .observeOn(Schedulers.computation())(或不同的调度程序,具体取决于您的需要),以便 onNext 调用在线程池上执行。