RxJava take(1) 完成 wheras Observable.just() 没有
RxJava take(1) completes wheras Observable.just() does not
我使用RXAndroidBle 连接蓝牙设备。我使用 establishConnection
来获取可观察的连接,并希望将此可观察对象转换为可完成对象。此代码有效并且可完成的按预期完成:
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.take(1)
.ignoreElements()
而这永远不会完成:
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.flatMap { Observable.just(it) }
.ignoreElements() // flatMapCompletable { Completable.complete() } doesn't work either
所以我纯粹是出于兴趣问,为什么 Observable.just() 的 flatMap 不起作用,因为 Obsrevable.just() 也会立即完成?
问题
从未完成:
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.flatMap { Observable.just(it) }
.ignoreElements() // flatMapCompletable { Completable.complete() } doesn't work either
这个其实很简单。 connectionObservable
可能是无限的。它会调用 onNext,但不会调用 onComplete。下游操作员接收 onNext 发出并相应地处理它。 flatMap 运算符仅在上游和内部流发出 onComplete 时才完成。 flatMap 的 inner-stream 完成了,但 source-observable 还没有完成。因此,您永远不会收到终端消息。
完成
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.take(1)
.ignoreElements()
此流完成,因为有终端操作员。在这种情况下,您有一个 take(1)。 Take-Operator 是做什么的?它将等待从源发出的 onNext 并将其转换为 onNext(message) 和 onComplete()。您可以将带有 Observable.just 的 flatMap 添加为 take-Operator 下面的内部流,它仍然会完成。
Take 运算符实现
@Override
public void onNext(T t) {
if (!done && remaining-- > 0) {
boolean stop = remaining == 0;
downstream.onNext(t);
if (stop) {
onComplete();
}
}
}
RxJava2 中 Take-Operator 的实现如下所示。很明显,上游 onNext 将导致 onNext 并可能导致 onComplete(下游)。
我使用RXAndroidBle 连接蓝牙设备。我使用 establishConnection
来获取可观察的连接,并希望将此可观察对象转换为可完成对象。此代码有效并且可完成的按预期完成:
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.take(1)
.ignoreElements()
而这永远不会完成:
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.flatMap { Observable.just(it) }
.ignoreElements() // flatMapCompletable { Completable.complete() } doesn't work either
所以我纯粹是出于兴趣问,为什么 Observable.just() 的 flatMap 不起作用,因为 Obsrevable.just() 也会立即完成?
问题
从未完成:
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.flatMap { Observable.just(it) }
.ignoreElements() // flatMapCompletable { Completable.complete() } doesn't work either
这个其实很简单。 connectionObservable
可能是无限的。它会调用 onNext,但不会调用 onComplete。下游操作员接收 onNext 发出并相应地处理它。 flatMap 运算符仅在上游和内部流发出 onComplete 时才完成。 flatMap 的 inner-stream 完成了,但 source-observable 还没有完成。因此,您永远不会收到终端消息。
完成
connectionObservable
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
...
startReadingData()
}
.doOnError { ... }
.take(1)
.ignoreElements()
此流完成,因为有终端操作员。在这种情况下,您有一个 take(1)。 Take-Operator 是做什么的?它将等待从源发出的 onNext 并将其转换为 onNext(message) 和 onComplete()。您可以将带有 Observable.just 的 flatMap 添加为 take-Operator 下面的内部流,它仍然会完成。
Take 运算符实现
@Override
public void onNext(T t) {
if (!done && remaining-- > 0) {
boolean stop = remaining == 0;
downstream.onNext(t);
if (stop) {
onComplete();
}
}
}
RxJava2 中 Take-Operator 的实现如下所示。很明显,上游 onNext 将导致 onNext 并可能导致 onComplete(下游)。