RxJava take(1) 完成 wheras Observable.just() 没有

RxJava take(1) completes wheras Observable.just() does not

我使用RXAndroidBle 连接蓝牙设备。我使用 establishConnection 来获取可观察的连接,并希望将此可观察对象转换为可完成对象。此代码有效并且可完成的按预期完成:

connectionObservable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                ...
                startReadingData()
            }
            .doOnError { ... }
            .take(1)
            .ignoreElements()

而这永远不会完成:

connectionObservable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                ...
                startReadingData()
            }
            .doOnError { ... }
            .flatMap { Observable.just(it) }
            .ignoreElements()  // flatMapCompletable { Completable.complete() } doesn't work either

所以我纯粹是出于兴趣问,为什么 Observable.just() 的 flatMap 不起作用,因为 Obsrevable.just() 也会立即完成?

问题

从未完成:

connectionObservable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                ...
                startReadingData()
            }
            .doOnError { ... }
            .flatMap { Observable.just(it) }
            .ignoreElements()  // flatMapCompletable { Completable.complete() } doesn't work either

这个其实很简单。 connectionObservable 可能是无限的。它会调用 onNext,但不会调用 onComplete。下游操作员接收 onNext 发出并相应地处理它。 flatMap 运算符仅在上游和内部流发出 onComplete 时才完成。 flatMap 的 inner-stream 完成了,但 source-observable 还没有完成。因此,您永远不会收到终端消息。

完成

connectionObservable
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext {
                ...
                startReadingData()
            }
            .doOnError { ... }
            .take(1)
            .ignoreElements()

此流完成,因为有终端操作员。在这种情况下,您有一个 take(1)。 Take-Operator 是做什么的?它将等待从源发出的 onNext 并将其转换为 onNext(message) 和 onComplete()。您可以将带有 Observable.just 的 flatMap 添加为 take-Operator 下面的内部流,它仍然会完成。

Take 运算符实现

    @Override
    public void onNext(T t) {
        if (!done && remaining-- > 0) {
            boolean stop = remaining == 0;
            downstream.onNext(t);
            if (stop) {
                onComplete();
            }
        }
    }

RxJava2 中 Take-Operator 的实现如下所示。很明显,上游 onNext 将导致 onNext 并可能导致 onComplete(下游)。