在 RxJava 中,如何启动从 API 生成的潜在无限事件流?

In RxJava, how do I start a potentially infinite stream of events generated from an API?

我有一个 API 可供客户使用,可以简化为:

public class API {
  public void sendEvent(Event e);
}

Event 实例在客户端调用 API 时进入我的系统(技术上通过 Binder 进入 Service 派生),然后处理、过滤并分派到其他内部组件。我不关心过去的事件,只关心订阅者订阅时可用的事件。它似乎很适合 Rx 范式,我刚刚开始接触它。

我需要一个创建一次的 Observable,允许多个订阅者,并且可以提供 Event 的实例,然后通过反应管道发送给观察者。 Subject 似乎适合我想要做的事情(特别是 this answer to this question 引起了我的共鸣)。

其他 RxJava 用户推荐什么?

例如,在我的简短评论之后:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}

那么你可能在某处有一个实例:API api = ...

你的 Observable 是这样获得的:Observable.create(api); 然后你可以做任何你会用 Observable 做的正常事情。

过滤未订阅的 Subscriber 留给 reader.

作为练习

编辑

更多研究表明 PublishSubject 应该有所帮助:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}

这样就可以订阅这个Observable,每次发送一个事件给API,就发布给所有的订阅者

这样使用:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));

尝试 observable.share(),它在幕后调用 .publish().refCount()。它将仅使用一个基础订阅并为您提供您指定的多个订阅行为。