RxJava 使用 autoConnect 处理 ConnectableObservable

RxJava dispose of ConnectableObservable with autoConnect

我有一个上游,它以块的形式发出数据。应该使用 throttleFirst 来限制此流。此外,在所有节流计时器完成后,应发出最后一个值。不幸的是,RxJava 2 中没有 throttleFierstBuffered 运算符,因此我实现了一个 ObservableTransformer:

upstream -> {
      Observable<T> autoConnectingUpstream =
          upstream //
              .publish()
              .autoConnect(2);

      return Observable.merge(
              autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
              autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
          //if debounce and throttle emit the same item
          .distinctUntilChanged();
    }

除处理外,它都很好用。在处理生成的 Observable 之后,我也想处理上游。我该怎么做?

我已经尝试使用 autoConnect(2, disposable -> {}) 访问 disposable,但必须有更好的方法。到目前为止我得到了这个但我不喜欢它:

Observable.<T>create(
            emitter -> {
              Observable<T> autoConnectingUpstream =
                  upstream //
                      .publish()
                      .autoConnect(2, emitter::setDisposable);

              Observable.merge(
                      autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
                      autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
                  //if debounce and throttle emit the same item
                  .distinctUntilChanged()
                  .subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
            });

我在这里回答我自己的问题,所以如果我错了请告诉我。

根据 akarnokd 的评论,解决方案如下所示:

Observable.<T>create(
        emitter -> {
          Observable<T> autoConnectingUpstream =
              upstream //
                  .publish()
                  .autoConnect(2, emitter::setDisposable);

          Observable.merge(
                  autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
                  autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
              //if debounce and throttle emit the same item
              .distinctUntilChanged()
              .subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
        });

autoConnect 的第二个参数是一个 Action,表示已建立的 2 个已连接观察者的连接。

这可以与 emitter::setDisposable 一起使用,以便在观察者处理生成的 Observable 时处理自动连接。