如何创建一个 Observable 来在 Observables 完成后合并一个 Observables 流?

How to create an Observable that will marge a stream of Observables after they complete?

我正在使用 Jetty WebSocket 客户端从客户端设置 WebSocket 连接。我正在尝试创建一个 class,它将以 Observable 的形式提供事件流。

我设法通过编写一个 POJO @WebSocket class 将所有内容发布到 SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create()); 并且一切正常。

如何让它在每次连接中断时重新连接?

我尝试从 Observable.interval 开始,然后 flatMap-ing 到 Observable<ObservableSocket.SocketEvent> connect(String url) 每个连接 returns Observable。

Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
    try {
        System.out.println("Connect");
        return connect(url);
    } catch (Exception e) {
        System.out.println("Exception: " + e);
        return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
    }
});

问题是,它每 1 秒创建一个连接。如何让 flatMap 等待内部 Observable 完成?

Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });

concatMap 维护一个 SerialSubscription,一次只会订阅一个发出的可观察对象,等待每个对象终止。该范围提供了无限信号(在这种情况下,无限结束于大约 20 亿 :P),并且 concatMap 将根据创建的每个内部可观察对象一次一个连接。