传递覆盖 onSubscribe 的订阅者时调用 dispose()

Calling dispose() when passing a subscriber that overrides onSubscribe

我是 RxJava 的新手,如果我理解正确的话,ObserveronSubscribe 上传递给了 Disposable,所以如果 dispose() 它可以手动停止处理已经调用了。
我创建了以下代码:

@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
      private Disposable d;

      @Override
      public void onSubscribe(@NonNull Disposable d) {
           this.d = d;
      }

      @Override
      public void onNext(@NonNull Long aLong) {
           if(!d.isDisposed()) {
              System.out.println("Number onNext = " + aLong);
           }
      }

       @Override
       public void onError(@NonNull Throwable e) {

       }

       @Override
       public void onComplete() {
           System.out.println("completed");
       }
 });

但我不知道如何为该订阅调用 dispose()subscribeObserver 作为参数传递 returns voidsubscribeWith 不接受我的 Observer 没有编译错误。

这应该如何运作?我在这里误解了什么?

您可以使用 DisposableObserver,它可以在您完成观察后轻松处理。

@NonNull Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
    src.subscribe(new DisposableObserver<Long>() {
        @Override
        public void onNext(@NotNull Long aLong) {
            //Do anything you want to do..
            dispose();
        }

        @Override
        public void onError(@NotNull Throwable e) {
            //Handle the errors here..
            dispose();
        }

        @Override
        public void onComplete() {
            dispose();
        }
    });

您还可以使用 CompositeDisposable 一次处置多个观察者,有关详细信息,请查看此内容。

https://www.tutorialspoint.com/rxjava/rxjava_compositedisposable.htm

ObservableJavaDocs有一个简单的例子:

Disposable d = Observable.just("Hello world!")
     .delay(1, TimeUnit.SECONDS)
     .subscribeWith(new DisposableObserver<String>() {
         @Override public void onStart() {
             System.out.println("Start!");
         }
         @Override public void onNext(String t) {
             System.out.println(t);
         }
         @Override public void onError(Throwable t) {
             t.printStackTrace();
         }
         @Override public void onComplete() {
             System.out.println("Done!");
         }
     });

 Thread.sleep(500);
 // the sequence can now be disposed via dispose()
 d.dispose();

编辑

以下示例是从 onSubscribe 方法中获取 Disposable 的方法,但通常不推荐使用:

// field in the owner class
Disposable disposable;

public void doReactive() {
    Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
    src.subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(@NonNull Disposable d) {
           disposable = d;
        }

        // ...
    });
}

public void cleanup() {
   if (disposable != null) {
       disposable.dispose();
       disposable = null;
   }
}

SerialDisposable sd = new SerialDisposable();

Observable<Long> src = Observable.interval(1, TimeUnit.SECONDS);
    src.subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(@NonNull Disposable d) {
           sd.set(d);
        }

        // ...
    });

// ...

sd.dispose();