为什么 Flowable.subscribe(订阅者) return 不是一次性的?
Why doesn't Flowable.subscribe(Subscriber) return a Disposable?
大多数 Flowable.subscribe()
重载 return a Disposable
可以清理流。我习惯于这样做:
Disposable d = Flowable.just()
.map(...)
.subscribe(
n -> ...
t -> ...
() -> ...
);
// someone clicks "cancel" in another thread
d.dispose();
但是,当使用 .subscribe(Subscriber)
时,Disposable
不是 returned。我想使用 .subscribe(Subscriber)
这样我就可以传入 TestSubscriber
来验证行为。那么在这种情况下我将如何处理流程?
我在 Javadoc 中搜索了合适的 Subscriber
。 DisposableSubscriber
看起来可行,但有两个问题:
- class 描述如下,这表明
cancel()
不能从流程外部使用:
Use the protected request(long) to request more items and cancel() to cancel the sequence from within an onNext implementation.
- TestSubscriber 不扩展 DisposableSubscriber。
您可以使用 Flowable.subscribeWith(Subscriber)
而不是 subscribe
,因此您的 Subscriber
是 return,而不是 void
。
在 RxJava 中 3.x TestSubscriber
不再实现 Disposable
。它确实实现了 dispose
and isDisposed
methods, as defined by BaseTestConsumer
, which it extends. However, both of those methods have been made protected
, so you can't actually use them directly. Luckily, there is TestSubscriber.cancel()
/TestSubscriber.isCancelled()
,它们是 public,等同于 dispose()
/isDisposed()
,因此您可以使用它们。
至于Flowable.subscribe
没有return一个Disposable
的原因,这个是在RxJava 2里改的,to adhere to the Reactive-Streams specification:
Due to the Reactive-Streams specification, Publisher.subscribe
returns void
...To remedy this, the method E subscribeWith(E subscriber)
has been added to each base reactive class which returns its input subscriber/observer as is.
大多数 Flowable.subscribe()
重载 return a Disposable
可以清理流。我习惯于这样做:
Disposable d = Flowable.just()
.map(...)
.subscribe(
n -> ...
t -> ...
() -> ...
);
// someone clicks "cancel" in another thread
d.dispose();
但是,当使用 .subscribe(Subscriber)
时,Disposable
不是 returned。我想使用 .subscribe(Subscriber)
这样我就可以传入 TestSubscriber
来验证行为。那么在这种情况下我将如何处理流程?
我在 Javadoc 中搜索了合适的 Subscriber
。 DisposableSubscriber
看起来可行,但有两个问题:
- class 描述如下,这表明
cancel()
不能从流程外部使用:
Use the protected request(long) to request more items and cancel() to cancel the sequence from within an onNext implementation.
- TestSubscriber 不扩展 DisposableSubscriber。
您可以使用 Flowable.subscribeWith(Subscriber)
而不是 subscribe
,因此您的 Subscriber
是 return,而不是 void
。
在 RxJava 中 3.x TestSubscriber
不再实现 Disposable
。它确实实现了 dispose
and isDisposed
methods, as defined by BaseTestConsumer
, which it extends. However, both of those methods have been made protected
, so you can't actually use them directly. Luckily, there is TestSubscriber.cancel()
/TestSubscriber.isCancelled()
,它们是 public,等同于 dispose()
/isDisposed()
,因此您可以使用它们。
至于Flowable.subscribe
没有return一个Disposable
的原因,这个是在RxJava 2里改的,to adhere to the Reactive-Streams specification:
Due to the Reactive-Streams specification,
Publisher.subscribe
returnsvoid
...To remedy this, the methodE subscribeWith(E subscriber)
has been added to each base reactive class which returns its input subscriber/observer as is.