使用 RxAndroidBle,我如何订阅对特性的写入响应?
Using RxAndroidBle, how do I subscribe to responses from writing to a characteristic?
我正在连接的 BLE 设备在其 GATT 特征之一上发出字节以响应对特征的写入。客户端应该启用该特征的通知,并解释该特征的更改字节。 (我正在控制的行为是打开附近无线网络的扫描服务,然后监听服务输出。)
我正在使用 RxAndroidBle 并关注 examples。我有一个活动连接 Observable。我要观察的特征有一个名为 AP_SCAN_DATA
的 UUID。它应该发出 0xFE
以响应收到书面的 0xFF
.
如何调用 setupNotification
并在其上设置观察者以捕获发出的 byte[]
s,然后将值写入特征,以便捕获响应?
到目前为止我的最大努力:
connectionObservable.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RxBleConnection>() {
@Override
public void onCompleted() { // ignore...
}
@Override
public void onError(Throwable e) { // ignore...
}
@Override
public void onNext(final RxBleConnection connection) {
Observable.just(connection)
.flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call(RxBleConnection connection) {
return connection.setupNotification(AP_SCAN_DATA);
}
})
.doOnNext(new Action1<Observable<byte[]>>() {
@Override
public void call(Observable<byte[]> observable) {
Log.i(TAG, "notification has been set up");
// This code logs on DEBUG that a write was made, but no response ever arrives
connection.writeCharacteristic(AP_SCAN_DATA, CharacteristicValue.RESET.asBytes())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
Log.i(TAG, "want to read response bytes here, but I don't... " + HexString.bytesToHex(bytes));
}
})
.subscribe();
}
});
已有一个主题,您可以从中找到一些见解 -> RxAndroidBle keeping a persistant connection + Write/Notification handling
这就是您如何在仅使用一个 .subscribe()
.
的情况下获得相同结果的方法
connectionObservable
.flatMap( // when the connection is available...
rxBleConnection -> rxBleConnection.setupNotification(AP_SCAN_DATA), // ... setup the notification...
(rxBleConnection, apScanDataNotificationObservable) -> Observable.combineLatest( // ... when the notification is setup...
rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue), // ... write the characteristic...
apScanDataNotificationObservable.first(), // ... and observe for the first notification on the AP_SCAN_DATA
(writtenBytes, responseBytes) -> responseBytes // ... when both will appear return just the response bytes...
)
)
.flatMap(observable -> observable) // ... flatMap the result as it is Observable<byte[]>...
.first() // ... and finish after first response is received to cleanup notifications
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
responseBytes -> { /* consume the response here */ },
throwable -> { /* handle exception */ }
);
仅供参考 - 你应该在每个 .subscribe()
中处理错误,除非你 100% 确定 Observable
不会发出错误。
对于那些不使用支持 lambda 的 Java 版本的读者,这是我对 @s_noopy 的回答的实现。
connectionObservable
.flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call(RxBleConnection connection) {
return connection.setupNotification(AP_SCAN_DATA);
}
}, new Func2<RxBleConnection, Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(RxBleConnection connection, Observable<byte[]> apScanDataNotificationObservable) {
return Observable.combineLatest(
connection.writeCharacteristic(AP_SCAN_DATA, CharacteristicValue.RESET.asBytes()),
apScanDataNotificationObservable.first(),
new Func2<byte[], byte[], byte[]>() {
@Override
public byte[] call(byte[] writtenBytes, byte[] responseBytes) {
return responseBytes;
}
}
);
}
}
).flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.first()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
Log.i(TAG, "notification response...." + HexString.bytesToHex(bytes));
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logError(throwable);
}
});
我正在连接的 BLE 设备在其 GATT 特征之一上发出字节以响应对特征的写入。客户端应该启用该特征的通知,并解释该特征的更改字节。 (我正在控制的行为是打开附近无线网络的扫描服务,然后监听服务输出。)
我正在使用 RxAndroidBle 并关注 examples。我有一个活动连接 Observable。我要观察的特征有一个名为 AP_SCAN_DATA
的 UUID。它应该发出 0xFE
以响应收到书面的 0xFF
.
如何调用 setupNotification
并在其上设置观察者以捕获发出的 byte[]
s,然后将值写入特征,以便捕获响应?
到目前为止我的最大努力:
connectionObservable.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RxBleConnection>() {
@Override
public void onCompleted() { // ignore...
}
@Override
public void onError(Throwable e) { // ignore...
}
@Override
public void onNext(final RxBleConnection connection) {
Observable.just(connection)
.flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call(RxBleConnection connection) {
return connection.setupNotification(AP_SCAN_DATA);
}
})
.doOnNext(new Action1<Observable<byte[]>>() {
@Override
public void call(Observable<byte[]> observable) {
Log.i(TAG, "notification has been set up");
// This code logs on DEBUG that a write was made, but no response ever arrives
connection.writeCharacteristic(AP_SCAN_DATA, CharacteristicValue.RESET.asBytes())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
Log.i(TAG, "want to read response bytes here, but I don't... " + HexString.bytesToHex(bytes));
}
})
.subscribe();
}
});
已有一个主题,您可以从中找到一些见解 -> RxAndroidBle keeping a persistant connection + Write/Notification handling
这就是您如何在仅使用一个 .subscribe()
.
connectionObservable
.flatMap( // when the connection is available...
rxBleConnection -> rxBleConnection.setupNotification(AP_SCAN_DATA), // ... setup the notification...
(rxBleConnection, apScanDataNotificationObservable) -> Observable.combineLatest( // ... when the notification is setup...
rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue), // ... write the characteristic...
apScanDataNotificationObservable.first(), // ... and observe for the first notification on the AP_SCAN_DATA
(writtenBytes, responseBytes) -> responseBytes // ... when both will appear return just the response bytes...
)
)
.flatMap(observable -> observable) // ... flatMap the result as it is Observable<byte[]>...
.first() // ... and finish after first response is received to cleanup notifications
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
responseBytes -> { /* consume the response here */ },
throwable -> { /* handle exception */ }
);
仅供参考 - 你应该在每个 .subscribe()
中处理错误,除非你 100% 确定 Observable
不会发出错误。
对于那些不使用支持 lambda 的 Java 版本的读者,这是我对 @s_noopy 的回答的实现。
connectionObservable
.flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call(RxBleConnection connection) {
return connection.setupNotification(AP_SCAN_DATA);
}
}, new Func2<RxBleConnection, Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(RxBleConnection connection, Observable<byte[]> apScanDataNotificationObservable) {
return Observable.combineLatest(
connection.writeCharacteristic(AP_SCAN_DATA, CharacteristicValue.RESET.asBytes()),
apScanDataNotificationObservable.first(),
new Func2<byte[], byte[], byte[]>() {
@Override
public byte[] call(byte[] writtenBytes, byte[] responseBytes) {
return responseBytes;
}
}
);
}
}
).flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.first()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
Log.i(TAG, "notification response...." + HexString.bytesToHex(bytes));
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
logError(throwable);
}
});