RxJava Zip 两个可观察对象而不完成它们

RxJava Zip two observables without completing them

我有两个可观察对象:第一个来自库 RxAndroidBle:

Observable<RxBleConnection> bluetoothObservable = RxBleClient.create(getBaseContext()).getBleDevice(macAddress)
.establishConnection(false)

连接到设备并在有订阅者时保持连接,另一个

Observable<Response> serverObservable = Observable.fromCallable(() -> callServer())

然后我将它们压缩在一起

bluetoothObservable.zipWith(serverObservable , (rxBleConnection, s) -> {
                                Log.d(TAG, "zip done");
                                return "mock result";
                            }).subscribe((s) -> {},
                                    Throwable::printStackTrace);

但是在 zip bluetoothObservable 取消订阅后连接立即断开。我应该怎么做才能压缩这些可观察量并保持 bluetoothObservable alive/subscribed?

您可以使用:

而不是 .zip()
Observable.combineLatest(
  bluetoothObservable, 
  serverObservable,
  (rxBleConnection, s) -> {
    Log.d(TAG, "combined");
    return "mock result";
  }
)
  .subscribe(
    (s) -> {},
    Throwable::printStackTrace
  )

解释:zip 尝试将两个 Observable 的输出一一组合。如果其中一个将完成并且该 Observable 之前的所有排放都匹配 - 没有必要保持对另一个 Observable 的订阅,因为后续排放将不会被使用。 combineLatest 只是尝试将来自两个 Observable 的所有排放量组合成对,即 最新