flatMap 的执行量取决于观察者拉动,而需要单次执行
Amount of flatMap executions depends on observer pull, while single is needed
我有一个触发 http 请求逻辑 (flatMap) 的推送通知机制 (PublishSubject)。基本场景是,无论何时推送到达,都会进行单个 http 调用并将结果传播给多个观察者。
我已经为这个案例编写了一个简单的演示,但是 flatMap 会为每个注册的观察者执行,而我希望它在每次推送时只被触发一次。
PublishSubject<Integer> subject = PublishSubject.create();
Observable<String> obs = subject.asObservable().flatMap(integer -> {
// this code runs for each observer, which is twice in this case
return Observable.just(String.valueOf(integer));
});
Observer mock = mock(Observer.class);
Observer mock1 = mock(Observer.class);
obs.subscribe(mock);
obs.subscribe(mock1);
subject.onNext(1);
你能提出修复建议吗?
谢谢
P.S。现在我正在使用 cache(1) 来解决这个问题,但我不确定这样做是否可以。此外,我不能完全理解为什么单个执行流将取决于附加的观察者数量。你能对此发表评论吗?
你已经在使用它了,你需要 publish()
+ connect()
或 publish().refCount()
如果你想让多个 observables 共享这个值。第一种情况可让您控制何时让 Observable
真正上线,而第二种情况会在您第一次订阅后立即上线。 RxJS 也有 share
包装 publish().refCount()
不确定 RxJava 是否也有。
PublishSubject<Integer> subject = PublishSubject.create();
Observable<String> obs = subject.asObservable().flatMap(integer -> {
// this code runs for each observer, which is twice in this case
return Observable.just(String.valueOf(integer));
}).publish().refCount();
Observer mock = mock(Observer.class);
Observer mock1 = mock(Observer.class);
obs.subscribe(mock);
obs.subscribe(mock1);
subject.onNext(1);
我有一个触发 http 请求逻辑 (flatMap) 的推送通知机制 (PublishSubject)。基本场景是,无论何时推送到达,都会进行单个 http 调用并将结果传播给多个观察者。
我已经为这个案例编写了一个简单的演示,但是 flatMap 会为每个注册的观察者执行,而我希望它在每次推送时只被触发一次。
PublishSubject<Integer> subject = PublishSubject.create();
Observable<String> obs = subject.asObservable().flatMap(integer -> {
// this code runs for each observer, which is twice in this case
return Observable.just(String.valueOf(integer));
});
Observer mock = mock(Observer.class);
Observer mock1 = mock(Observer.class);
obs.subscribe(mock);
obs.subscribe(mock1);
subject.onNext(1);
你能提出修复建议吗?
谢谢
P.S。现在我正在使用 cache(1) 来解决这个问题,但我不确定这样做是否可以。此外,我不能完全理解为什么单个执行流将取决于附加的观察者数量。你能对此发表评论吗?
你已经在使用它了,你需要 publish()
+ connect()
或 publish().refCount()
如果你想让多个 observables 共享这个值。第一种情况可让您控制何时让 Observable
真正上线,而第二种情况会在您第一次订阅后立即上线。 RxJS 也有 share
包装 publish().refCount()
不确定 RxJava 是否也有。
PublishSubject<Integer> subject = PublishSubject.create();
Observable<String> obs = subject.asObservable().flatMap(integer -> {
// this code runs for each observer, which is twice in this case
return Observable.just(String.valueOf(integer));
}).publish().refCount();
Observer mock = mock(Observer.class);
Observer mock1 = mock(Observer.class);
obs.subscribe(mock);
obs.subscribe(mock1);
subject.onNext(1);