单个 Observable 是否按顺序通知其所有观察者?
Does a single Observable notify sequentially to all its Observers?
我根据 PublishSubject
编写了 ObservableList
。每次客户端向此列表添加一个元素时,观察者都会通过主题上的 onNext
方法得到通知。
我编写了两个示例观察者并将它们订阅到 ObservableList
。我注意到通知是顺序的并且是阻塞的。因此,如果我订阅了这两个观察者,则会发生以下情况:
- 观察者收到通知的顺序与订阅的顺序相同。
- 如果一个观察者正在阻塞,那么下一个观察者只会在前一个观察者结束执行时得到通知。
我真的不在乎 id 调用是顺序的,但想了解为什么通知也被阻塞以及我怎样才能让它不被阻塞。
这是 ObservableList
的代码
package co.com.subjects.example;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
public class ObservableList<T>{
public String nombre;
protected final List<T> list;
protected final PublishSubject<T> onAdd;
public ObservableList(String nombre) {
this.list = new ArrayList<T>();
this.onAdd = PublishSubject.create();
this.nombre = nombre;
}
public void add(T value) {
list.add(value);
onAdd.onNext(value);
}
public Observable<T> getObservable() {
return onAdd;
}
}
您可以通过将观察者包装在一个观察者中,并在 ExecutorService 中通知它来使其成为非阻塞。
显然,将任务添加到另一个池是相当昂贵的,所以我只会在您知道这需要一段时间时才这样做。另请注意,除非您小心,否则以这种方式发布的事件可能会乱序。一个简单的解决方案是为每个侦听器设置一个单线程执行器。
want to understand why the notification is blocking
RxJava 的实现假设执行一个 Observer 的 onNext
方法总是快速且便宜的,所以 PublishSubject
只是一个接一个地调用它的 Observers 的所有 onNext
方法,没有引入任何并发。
as well and how can I make it not to be blocking
在订阅您的观察者之前,您可以插入 .observeOn(Schedulers.computation())
(或不同的调度程序,具体取决于您的需要),以便 onNext
调用在线程池上执行。
我根据 PublishSubject
编写了 ObservableList
。每次客户端向此列表添加一个元素时,观察者都会通过主题上的 onNext
方法得到通知。
我编写了两个示例观察者并将它们订阅到 ObservableList
。我注意到通知是顺序的并且是阻塞的。因此,如果我订阅了这两个观察者,则会发生以下情况:
- 观察者收到通知的顺序与订阅的顺序相同。
- 如果一个观察者正在阻塞,那么下一个观察者只会在前一个观察者结束执行时得到通知。
我真的不在乎 id 调用是顺序的,但想了解为什么通知也被阻塞以及我怎样才能让它不被阻塞。
这是 ObservableList
package co.com.subjects.example;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
public class ObservableList<T>{
public String nombre;
protected final List<T> list;
protected final PublishSubject<T> onAdd;
public ObservableList(String nombre) {
this.list = new ArrayList<T>();
this.onAdd = PublishSubject.create();
this.nombre = nombre;
}
public void add(T value) {
list.add(value);
onAdd.onNext(value);
}
public Observable<T> getObservable() {
return onAdd;
}
}
您可以通过将观察者包装在一个观察者中,并在 ExecutorService 中通知它来使其成为非阻塞。
显然,将任务添加到另一个池是相当昂贵的,所以我只会在您知道这需要一段时间时才这样做。另请注意,除非您小心,否则以这种方式发布的事件可能会乱序。一个简单的解决方案是为每个侦听器设置一个单线程执行器。
want to understand why the notification is blocking
RxJava 的实现假设执行一个 Observer 的 onNext
方法总是快速且便宜的,所以 PublishSubject
只是一个接一个地调用它的 Observers 的所有 onNext
方法,没有引入任何并发。
as well and how can I make it not to be blocking
在订阅您的观察者之前,您可以插入 .observeOn(Schedulers.computation())
(或不同的调度程序,具体取决于您的需要),以便 onNext
调用在线程池上执行。