RxJava 使用 autoConnect 处理 ConnectableObservable
RxJava dispose of ConnectableObservable with autoConnect
我有一个上游,它以块的形式发出数据。应该使用 throttleFirst 来限制此流。此外,在所有节流计时器完成后,应发出最后一个值。不幸的是,RxJava 2 中没有 throttleFierstBuffered 运算符,因此我实现了一个 ObservableTransformer:
upstream -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2);
return Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged();
}
除处理外,它都很好用。在处理生成的 Observable 之后,我也想处理上游。我该怎么做?
我已经尝试使用 autoConnect(2, disposable -> {}) 访问 disposable,但必须有更好的方法。到目前为止我得到了这个但我不喜欢它:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
我在这里回答我自己的问题,所以如果我错了请告诉我。
根据 akarnokd 的评论,解决方案如下所示:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
autoConnect 的第二个参数是一个 Action,表示已建立的 2 个已连接观察者的连接。
这可以与 emitter::setDisposable 一起使用,以便在观察者处理生成的 Observable 时处理自动连接。
我有一个上游,它以块的形式发出数据。应该使用 throttleFirst 来限制此流。此外,在所有节流计时器完成后,应发出最后一个值。不幸的是,RxJava 2 中没有 throttleFierstBuffered 运算符,因此我实现了一个 ObservableTransformer:
upstream -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2);
return Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged();
}
除处理外,它都很好用。在处理生成的 Observable 之后,我也想处理上游。我该怎么做?
我已经尝试使用 autoConnect(2, disposable -> {}) 访问 disposable,但必须有更好的方法。到目前为止我得到了这个但我不喜欢它:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
我在这里回答我自己的问题,所以如果我错了请告诉我。
根据 akarnokd 的评论,解决方案如下所示:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
autoConnect 的第二个参数是一个 Action,表示已建立的 2 个已连接观察者的连接。
这可以与 emitter::setDisposable 一起使用,以便在观察者处理生成的 Observable 时处理自动连接。