RxAndroidBle 在订阅多个特征时崩溃

RxAndroidBle crash on subscribe to multiple characteristics

我目前正在为专有蓝牙低功耗设备开发 Android 应用程序。

我决定使用 RxAndroidBle,与内置蓝牙堆栈相比,我很满意它的相对易用性。

我 运行 遇到的问题:我需要订阅两个特征并不断读取和汇总它们的值。

查看此示例页面,http://polidea.github.io/RxAndroidBle/,我已经能够按照示例读取多个特征,但未能成功合并多个订阅。

这是我得到的:

    subscriptionA = device.establishConnection(this, false)
            .flatMap(rxBleConnection -> rxBleConnection.setupNotification(aUUID))
            .doOnNext(notificationObservable -> {
                // Notification has been set up
            })
            .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
            .subscribe(bytes -> {
                // Given characteristic has been changes, here is the value.
                System.out.printf("Received 03: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
            });
    subscriptionB = device.establishConnection(this, false)
            .flatMap(rxBleConnection -> rxBleConnection.setupNotification(bUUID))
            .doOnNext(notificationObservable -> {
                // Notification has been set up
            })
            .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
            .subscribe(bytes -> {
                // Given characteristic has been changes, here is the value.
                System.out.printf("Received 05: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
            });

一到第二个订阅就崩溃了。我做错了什么?

这是错误:

17:47:33.444 27758-27758/com.exam E/AndroidRuntime: FATAL EXCEPTION: main
                                                                      Process: com.exam, PID: 27758
                                                                      rx.exceptions.OnErrorNotImplementedException
                                                                          at rx.Observable.onError(Observable.java:7923)
                                                                          at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
                                                                          at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
                                                                          at rx.internal.operators.OperatorMap.onError(OperatorMap.java:48)
                                                                          at rx.internal.operators.OperatorDoOnEach.onError(OperatorDoOnEach.java:71)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
                                                                          at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
                                                                          at rx.internal.operators.OperatorMap.onError(OperatorMap.java:48)
                                                                          at rx.observers.Subscribers.onError(Subscribers.java:225)
                                                                          at rx.Observable$ThrowObservable.call(Observable.java:9984)
                                                                          at rx.Observable$ThrowObservable.call(Observable.java:9974)
                                                                          at rx.Observable.unsafeSubscribe(Observable.java:8098)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
                                                                          at rx.Observable.call(Observable.java:162)
                                                                          at rx.Observable.call(Observable.java:154)
                                                                          at rx.Observable.call(Observable.java:162)
                                                                          at rx.Observable.call(Observable.java:154)
                                                                          at rx.Observable.call(Observable.java:162)
                                                                          at rx.Observable.call(Observable.java:154)
                                                                          at rx.Observable.call(Observable.java:162)
                                                                          at rx.Observable.call(Observable.java:154)
                                                                          at rx.Observable.call(Observable.java:162)
                                                                          at rx.Observable.call(Observable.java:154)
                                                                          at rx.Observable.subscribe(Observable.java:8191)
                                                                          at rx.Observable.subscribe(Observable.java:8158)
                                                                          at rx.Observable.subscribe(Observable.java:7914)
                                                                          at com.exam.BTConnection.enableConnection(BTConnection.java:65)
                                                                          at com.exam.MActivity.onServiceConnected(mActivity.java:104)
                                                                          at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
                                                                          at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
                                                                          at android.os.Handler.handleCallback(Handler.java:739)
                                                                          at android.os.Handler.dispatchMessage(Handler.java:95)
                                                                          at android.os.Looper.loop(Looper.java:135)
                                                                          at android.app.ActivityThread.main(ActivityThread.java:5254)
                                                                          at java.lang.reflect.Method.invoke(Native Method)
                                                                          at java.lang.reflect.Method.invoke(Method.java:372)
                                                                          at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
                                                                          at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
                                                                          at de.robv.android.xposed.XposedBridge.main(XposedBridge.java:115)
                                                                       Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
                                                                          at com.polidea.rxandroidble.internal.RxBleDeviceImpl.lambda$establishConnection(RxBleDeviceImpl.java:54)
                                                                          at com.polidea.rxandroidble.internal.RxBleDeviceImpl.access$lambda[=11=](RxBleDeviceImpl.java)
                                                                          at com.polidea.rxandroidble.internal.RxBleDeviceImpl$$Lambda.call(Unknown Source)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
                                                                          at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) 
                                                                          at rx.Observable.call(Observable.java:162) 
                                                                          at rx.Observable.call(Observable.java:154) 
                                                                          at rx.Observable.call(Observable.java:162) 
                                                                          at rx.Observable.call(Observable.java:154) 
                                                                          at rx.Observable.call(Observable.java:162) 
                                                                          at rx.Observable.call(Observable.java:154) 
                                                                          at rx.Observable.call(Observable.java:162) 
                                                                          at rx.Observable.call(Observable.java:154) 
                                                                          at rx.Observable.call(Observable.java:162) 
                                                                          at rx.Observable.call(Observable.java:154) 
                                                                          at rx.Observable.subscribe(Observable.java:8191) 
                                                                          at rx.Observable.subscribe(Observable.java:8158) 
                                                                          at rx.Observable.subscribe(Observable.java:7914) 
                                                                          at com.exam.BTConnection.enableConnection(BTConnection.java:65) 
                                                                          at com.exam.MActivity.onServiceConnected(MActivity.java:104) 
                                                                          at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208) 
                                                                          at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225) 
                                                                          at android.os.Handler.handleCallback(Handler.java:739) 
                                                                          at android.os.Handler.dispatchMessage(Handler.java:95) 
                                                                          at android.os.Looper.loop(Looper.java:135) 
                                                                          at android.app.ActivityThread.main(ActivityThread.java:5254) 
                                                                          at java.lang.reflect.Method.invoke(Native Method) 
                                                                          at java.lang.reflect.Method.invoke(Method.java:372) 

你犯了两个错误:

  1. 您尝试与同一台设备建立两个连接。您会在日志中看到这一点 (Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53})。 BLE 连接是点对点类型,您不能同时打开同一设备的两个连接(与 HTTP 不同)。
  2. 您没有处理连接期间可能发生的异常。您也可以在日志中看到这一点 (rx.exceptions.OnErrorNotImplementedException)

你应该怎么做:

    subscription = device.establishConnection(this, false)
            .flatMap(rxBleConnection -> Observable.combineLatest( // use the same connection and combine latest emissions
                    rxBleConnection.setupNotification(aUUID).<byte[]>flatMap(observable -> observable), // sometimes IDE get's lost in what type is returned from an Observable - that's why I added <byte[]>
                    rxBleConnection.setupNotification(bUUID).<byte[]>flatMap(observable -> observable),
                    Pair::new // merge into a Pair
            ))
            .subscribe(
                    byteArrayPair -> {
                        // here you get the latest values from notifications
                        byte[] aBytes = byteArrayPair.first;
                        byte[] bBytes = byteArrayPair.second; 
                        // do your thing
                    },
                    throwable -> {
                        // handle errors
                    }
            );

此致