在 RxJava 中,如何启动从 API 生成的潜在无限事件流?
In RxJava, how do I start a potentially infinite stream of events generated from an API?
我有一个 API 可供客户使用,可以简化为:
public class API {
public void sendEvent(Event e);
}
Event
实例在客户端调用 API 时进入我的系统(技术上通过 Binder 进入 Service
派生),然后处理、过滤并分派到其他内部组件。我不关心过去的事件,只关心订阅者订阅时可用的事件。它似乎很适合 Rx 范式,我刚刚开始接触它。
我需要一个创建一次的 Observable,允许多个订阅者,并且可以提供 Event
的实例,然后通过反应管道发送给观察者。 Subject
似乎适合我想要做的事情(特别是 this answer to this question 引起了我的共鸣)。
其他 RxJava 用户推荐什么?
例如,在我的简短评论之后:
public class API implements OnSubscribe<Event> {
private List<Subscriber<Event>> subscribers = new ArrayList<>();
public void sendEvent(Event event) {
// Do whatever you need with the event
for (Subscriber<Event> sub : subscribers) {
sub.onNext(event);
}
}
public void call(Subscriber<Event> sub) {
subscribers.add(sub);
}
}
那么你可能在某处有一个实例:API api = ...
你的 Observable 是这样获得的:Observable.create(api);
然后你可以做任何你会用 Observable 做的正常事情。
过滤未订阅的 Subscriber
留给 reader.
作为练习
编辑
更多研究表明 PublishSubject
应该有所帮助:
public class API {
private PublishSubject<Event> subject = PublishSubject.create();
public void sendEvent(Event event) {
// Do whatever you need with the event
// Then publish it
subject.onNext(event);
}
public Observable<Event> getObservable() {
return subject.asObservable();
}
}
这样就可以订阅这个Observable,每次发送一个事件给API
,就发布给所有的订阅者
这样使用:
API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
尝试 observable.share()
,它在幕后调用 .publish().refCount()
。它将仅使用一个基础订阅并为您提供您指定的多个订阅行为。
我有一个 API 可供客户使用,可以简化为:
public class API {
public void sendEvent(Event e);
}
Event
实例在客户端调用 API 时进入我的系统(技术上通过 Binder 进入 Service
派生),然后处理、过滤并分派到其他内部组件。我不关心过去的事件,只关心订阅者订阅时可用的事件。它似乎很适合 Rx 范式,我刚刚开始接触它。
我需要一个创建一次的 Observable,允许多个订阅者,并且可以提供 Event
的实例,然后通过反应管道发送给观察者。 Subject
似乎适合我想要做的事情(特别是 this answer to this question 引起了我的共鸣)。
其他 RxJava 用户推荐什么?
例如,在我的简短评论之后:
public class API implements OnSubscribe<Event> {
private List<Subscriber<Event>> subscribers = new ArrayList<>();
public void sendEvent(Event event) {
// Do whatever you need with the event
for (Subscriber<Event> sub : subscribers) {
sub.onNext(event);
}
}
public void call(Subscriber<Event> sub) {
subscribers.add(sub);
}
}
那么你可能在某处有一个实例:API api = ...
你的 Observable 是这样获得的:Observable.create(api);
然后你可以做任何你会用 Observable 做的正常事情。
过滤未订阅的 Subscriber
留给 reader.
编辑
更多研究表明 PublishSubject
应该有所帮助:
public class API {
private PublishSubject<Event> subject = PublishSubject.create();
public void sendEvent(Event event) {
// Do whatever you need with the event
// Then publish it
subject.onNext(event);
}
public Observable<Event> getObservable() {
return subject.asObservable();
}
}
这样就可以订阅这个Observable,每次发送一个事件给API
,就发布给所有的订阅者
这样使用:
API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
尝试 observable.share()
,它在幕后调用 .publish().refCount()
。它将仅使用一个基础订阅并为您提供您指定的多个订阅行为。