RxAndroidBle 在订阅多个特征时崩溃
RxAndroidBle crash on subscribe to multiple characteristics
我目前正在为专有蓝牙低功耗设备开发 Android 应用程序。
我决定使用 RxAndroidBle,与内置蓝牙堆栈相比,我很满意它的相对易用性。
我 运行 遇到的问题:我需要订阅两个特征并不断读取和汇总它们的值。
查看此示例页面,http://polidea.github.io/RxAndroidBle/,我已经能够按照示例读取多个特征,但未能成功合并多个订阅。
这是我得到的:
subscriptionA = device.establishConnection(this, false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(aUUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(bytes -> {
// Given characteristic has been changes, here is the value.
System.out.printf("Received 03: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
});
subscriptionB = device.establishConnection(this, false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(bUUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(bytes -> {
// Given characteristic has been changes, here is the value.
System.out.printf("Received 05: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
});
一到第二个订阅就崩溃了。我做错了什么?
这是错误:
17:47:33.444 27758-27758/com.exam E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.exam, PID: 27758
rx.exceptions.OnErrorNotImplementedException
at rx.Observable.onError(Observable.java:7923)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap.onError(OperatorMap.java:48)
at rx.internal.operators.OperatorDoOnEach.onError(OperatorDoOnEach.java:71)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap.onError(OperatorMap.java:48)
at rx.observers.Subscribers.onError(Subscribers.java:225)
at rx.Observable$ThrowObservable.call(Observable.java:9984)
at rx.Observable$ThrowObservable.call(Observable.java:9974)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7914)
at com.exam.BTConnection.enableConnection(BTConnection.java:65)
at com.exam.MActivity.onServiceConnected(mActivity.java:104)
at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
at de.robv.android.xposed.XposedBridge.main(XposedBridge.java:115)
Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
at com.polidea.rxandroidble.internal.RxBleDeviceImpl.lambda$establishConnection(RxBleDeviceImpl.java:54)
at com.polidea.rxandroidble.internal.RxBleDeviceImpl.access$lambda[=11=](RxBleDeviceImpl.java)
at com.polidea.rxandroidble.internal.RxBleDeviceImpl$$Lambda.call(Unknown Source)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7914)
at com.exam.BTConnection.enableConnection(BTConnection.java:65)
at com.exam.MActivity.onServiceConnected(MActivity.java:104)
at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
你犯了两个错误:
- 您尝试与同一台设备建立两个连接。您会在日志中看到这一点 (
Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
)。 BLE 连接是点对点类型,您不能同时打开同一设备的两个连接(与 HTTP 不同)。
- 您没有处理连接期间可能发生的异常。您也可以在日志中看到这一点 (
rx.exceptions.OnErrorNotImplementedException
)
你应该怎么做:
subscription = device.establishConnection(this, false)
.flatMap(rxBleConnection -> Observable.combineLatest( // use the same connection and combine latest emissions
rxBleConnection.setupNotification(aUUID).<byte[]>flatMap(observable -> observable), // sometimes IDE get's lost in what type is returned from an Observable - that's why I added <byte[]>
rxBleConnection.setupNotification(bUUID).<byte[]>flatMap(observable -> observable),
Pair::new // merge into a Pair
))
.subscribe(
byteArrayPair -> {
// here you get the latest values from notifications
byte[] aBytes = byteArrayPair.first;
byte[] bBytes = byteArrayPair.second;
// do your thing
},
throwable -> {
// handle errors
}
);
此致
我目前正在为专有蓝牙低功耗设备开发 Android 应用程序。
我决定使用 RxAndroidBle,与内置蓝牙堆栈相比,我很满意它的相对易用性。
我 运行 遇到的问题:我需要订阅两个特征并不断读取和汇总它们的值。
查看此示例页面,http://polidea.github.io/RxAndroidBle/,我已经能够按照示例读取多个特征,但未能成功合并多个订阅。
这是我得到的:
subscriptionA = device.establishConnection(this, false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(aUUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(bytes -> {
// Given characteristic has been changes, here is the value.
System.out.printf("Received 03: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
});
subscriptionB = device.establishConnection(this, false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(bUUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(bytes -> {
// Given characteristic has been changes, here is the value.
System.out.printf("Received 05: %d\n\tdata: %s\n", bytes.length, Arrays.toString(bytes));
});
一到第二个订阅就崩溃了。我做错了什么?
这是错误:
17:47:33.444 27758-27758/com.exam E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.exam, PID: 27758
rx.exceptions.OnErrorNotImplementedException
at rx.Observable.onError(Observable.java:7923)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:159)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:120)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap.onError(OperatorMap.java:48)
at rx.internal.operators.OperatorDoOnEach.onError(OperatorDoOnEach.java:71)
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:240)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:776)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:537)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:526)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:250)
at rx.internal.operators.OperatorMap.onError(OperatorMap.java:48)
at rx.observers.Subscribers.onError(Subscribers.java:225)
at rx.Observable$ThrowObservable.call(Observable.java:9984)
at rx.Observable$ThrowObservable.call(Observable.java:9974)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7914)
at com.exam.BTConnection.enableConnection(BTConnection.java:65)
at com.exam.MActivity.onServiceConnected(mActivity.java:104)
at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
at de.robv.android.xposed.XposedBridge.main(XposedBridge.java:115)
Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
at com.polidea.rxandroidble.internal.RxBleDeviceImpl.lambda$establishConnection(RxBleDeviceImpl.java:54)
at com.polidea.rxandroidble.internal.RxBleDeviceImpl.access$lambda[=11=](RxBleDeviceImpl.java)
at com.polidea.rxandroidble.internal.RxBleDeviceImpl$$Lambda.call(Unknown Source)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.call(Observable.java:162)
at rx.Observable.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at rx.Observable.subscribe(Observable.java:7914)
at com.exam.BTConnection.enableConnection(BTConnection.java:65)
at com.exam.MActivity.onServiceConnected(MActivity.java:104)
at android.app.LoadedApk$ServiceDispatcher.doConnected(LoadedApk.java:1208)
at android.app.LoadedApk$ServiceDispatcher$RunConnection.run(LoadedApk.java:1225)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
你犯了两个错误:
- 您尝试与同一台设备建立两个连接。您会在日志中看到这一点 (
Caused by: BleAlreadyConnectedException{macAddress=35:32:30:30:44:53}
)。 BLE 连接是点对点类型,您不能同时打开同一设备的两个连接(与 HTTP 不同)。 - 您没有处理连接期间可能发生的异常。您也可以在日志中看到这一点 (
rx.exceptions.OnErrorNotImplementedException
)
你应该怎么做:
subscription = device.establishConnection(this, false)
.flatMap(rxBleConnection -> Observable.combineLatest( // use the same connection and combine latest emissions
rxBleConnection.setupNotification(aUUID).<byte[]>flatMap(observable -> observable), // sometimes IDE get's lost in what type is returned from an Observable - that's why I added <byte[]>
rxBleConnection.setupNotification(bUUID).<byte[]>flatMap(observable -> observable),
Pair::new // merge into a Pair
))
.subscribe(
byteArrayPair -> {
// here you get the latest values from notifications
byte[] aBytes = byteArrayPair.first;
byte[] bBytes = byteArrayPair.second;
// do your thing
},
throwable -> {
// handle errors
}
);
此致