如何创建一个 Observable 来在 Observables 完成后合并一个 Observables 流?
How to create an Observable that will marge a stream of Observables after they complete?
我正在使用 Jetty WebSocket 客户端从客户端设置 WebSocket 连接。我正在尝试创建一个 class,它将以 Observable 的形式提供事件流。
我设法通过编写一个 POJO @WebSocket
class 将所有内容发布到 SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create());
并且一切正常。
如何让它在每次连接中断时重新连接?
我尝试从 Observable.interval
开始,然后 flatMap
-ing 到 Observable<ObservableSocket.SocketEvent> connect(String url)
每个连接 returns Observable。
Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
try {
System.out.println("Connect");
return connect(url);
} catch (Exception e) {
System.out.println("Exception: " + e);
return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
}
});
问题是,它每 1 秒创建一个连接。如何让 flatMap 等待内部 Observable 完成?
Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });
concatMap 维护一个 SerialSubscription,一次只会订阅一个发出的可观察对象,等待每个对象终止。该范围提供了无限信号(在这种情况下,无限结束于大约 20 亿 :P),并且 concatMap 将根据创建的每个内部可观察对象一次一个连接。
我正在使用 Jetty WebSocket 客户端从客户端设置 WebSocket 连接。我正在尝试创建一个 class,它将以 Observable 的形式提供事件流。
我设法通过编写一个 POJO @WebSocket
class 将所有内容发布到 SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create());
并且一切正常。
如何让它在每次连接中断时重新连接?
我尝试从 Observable.interval
开始,然后 flatMap
-ing 到 Observable<ObservableSocket.SocketEvent> connect(String url)
每个连接 returns Observable。
Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
try {
System.out.println("Connect");
return connect(url);
} catch (Exception e) {
System.out.println("Exception: " + e);
return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
}
});
问题是,它每 1 秒创建一个连接。如何让 flatMap 等待内部 Observable 完成?
Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });
concatMap 维护一个 SerialSubscription,一次只会订阅一个发出的可观察对象,等待每个对象终止。该范围提供了无限信号(在这种情况下,无限结束于大约 20 亿 :P),并且 concatMap 将根据创建的每个内部可观察对象一次一个连接。