为什么 Flowable.subscribe(订阅者) return 不是一次性的?

Why doesn't Flowable.subscribe(Subscriber) return a Disposable?

大多数 Flowable.subscribe() 重载 return a Disposable 可以清理流。我习惯于这样做:

Disposable d = Flowable.just()
    .map(...)
    .subscribe(
        n -> ...
        t -> ...
        () -> ...
    );

// someone clicks "cancel" in another thread
d.dispose();

但是,当使用 .subscribe(Subscriber) 时,Disposable 不是 returned。我想使用 .subscribe(Subscriber) 这样我就可以传入 TestSubscriber 来验证行为。那么在这种情况下我将如何处理流程?

我在 Javadoc 中搜索了合适的 SubscriberDisposableSubscriber 看起来可行,但有两个问题:

  1. class 描述如下,这表明 cancel() 不能从流程外部使用:

Use the protected request(long) to request more items and cancel() to cancel the sequence from within an onNext implementation.

  1. TestSubscriber 不扩展 DisposableSubscriber。

您可以使用 Flowable.subscribeWith(Subscriber) 而不是 subscribe,因此您的 Subscriber 是 return,而不是 void

在 RxJava 中 3.x TestSubscriber 不再实现 Disposable。它确实实现了 dispose and isDisposed methods, as defined by BaseTestConsumer, which it extends. However, both of those methods have been made protected, so you can't actually use them directly. Luckily, there is TestSubscriber.cancel()/TestSubscriber.isCancelled(),它们是 public,等同于 dispose()/isDisposed(),因此您可以使用它们。

至于Flowable.subscribe没有return一个Disposable的原因,这个是在RxJava 2里改的,to adhere to the Reactive-Streams specification

Due to the Reactive-Streams specification, Publisher.subscribe returns void ...To remedy this, the method E subscribeWith(E subscriber) has been added to each base reactive class which returns its input subscriber/observer as is.