从同一个 Observable (RxAndroidBle) 观察多次
Observe many times from same Observable (RxAndroidBle)
我正在使用 RxAndroidBle 库和 RxJava2 来读取 BLE 特性。我认为这个问题只是一个 RxJava 问题,但包括我正在使用的细节 RxAndroidBle
以防有用。
我获得连接,然后用它来调用 readCharacteristic()
,它本身 returns 一个 Single<ByteArray>
。在这一点上,我不只是想得到一个 ByteArray
。这个特性需要多次读取,因为BLE设备设置为让我返回一个小文件,而特性一次只能返回20个字节,因此我需要重复读取。
是否可以修改此代码,使 returns 下面的 switchMap()
和 Observable
会发出许多 ByteArrays
,而不是只发出一个?
我是 RxJava 新手。
val connection: Observable<RxBleConnection> = selectedDevice.record.bleDevice.establishConnection(false, Timeout(30, TimeUnit.SECONDS))
return connection
.subscribeOn(Schedulers.io())
.switchMap {
// I want to get an Observable that can read multiple times here.
it.readCharacteristic(serverCertCharacteristicUUID).toObservable()
}
.doOnNext {
Timber.e("Got Certificate bytes")
}
.map {
String(it as ByteArray)
}
.doOnNext {
Timber.e("Got certificate: $it")
}
.singleOrError()
您可以使用通知来缓冲您的数据。
device.establishConnection(false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(characteristicUuid))
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(
bytes -> {
// Given characteristic has been changes, here is the value.
},
throwable -> {
// Handle an error here.
}
);
要重复多次读取直到发出特定值,需要更改此部分:
// I want to get an Observable that can read multiple times here.
it.readCharacteristic(serverCertCharacteristicUUID).toObservable()
类似于 suggested by the RxJava author in the first answer that google gives for phrase rxjava single repeat
:
// this will repeat until a `checkRepeatIf` returns false
Observable.defer {
val successValue = AtomicReference<ByteArray>()
connection.readCharacteristic(serverCertCharacteristicUUID)
.doOnSuccess { successValue.lazySet(it) }
.repeatWhen { completes -> completes.takeWhile { checkRepeatIf(successValue.get()) } }
}
我可以通过发送一个信号来停止 connectionObservable
和蓝牙特性的读取来使它正常工作。值得注意的是你需要在 repeat()
之后调用 toObservable()
否则这不起作用,虽然我不知道为什么。
override fun readMultipartCharacteristic(macAddress: String): Single<String> {
val CERTIFICATE_TERMINATOR = 0x30.toByte()
val device = bluetoothService.getBleDevice(macAddress)
if (connectionObservable == null || !device.connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED)) {
connectionObservable = device.establishConnection(false, Timeout(30, TimeUnit.SECONDS))
}
val stop: PublishSubject<Unit> = PublishSubject.create()
return connectionObservable!!
.subscribeOn(Schedulers.io())
.takeUntil(stop)
.switchMap {
it.readCharacteristic(UUID("my-uuid"))
.repeat()
.toObservable()
.takeUntil(stop)
}
.collectInto(ByteArrayOutputStream(), { buffer, byteArray ->
// Watch for the signal of the end of the stream
if (byteArray.size == 1 && byteArray.get(0).equals(CERTIFICATE_TERMINATOR)) {
stop.onComplete()
} else {
buffer.write(byteArray)
}
})
.map {
String(it.toByteArray())
}
}
我正在使用 RxAndroidBle 库和 RxJava2 来读取 BLE 特性。我认为这个问题只是一个 RxJava 问题,但包括我正在使用的细节 RxAndroidBle
以防有用。
我获得连接,然后用它来调用 readCharacteristic()
,它本身 returns 一个 Single<ByteArray>
。在这一点上,我不只是想得到一个 ByteArray
。这个特性需要多次读取,因为BLE设备设置为让我返回一个小文件,而特性一次只能返回20个字节,因此我需要重复读取。
是否可以修改此代码,使 returns 下面的 switchMap()
和 Observable
会发出许多 ByteArrays
,而不是只发出一个?
我是 RxJava 新手。
val connection: Observable<RxBleConnection> = selectedDevice.record.bleDevice.establishConnection(false, Timeout(30, TimeUnit.SECONDS))
return connection
.subscribeOn(Schedulers.io())
.switchMap {
// I want to get an Observable that can read multiple times here.
it.readCharacteristic(serverCertCharacteristicUUID).toObservable()
}
.doOnNext {
Timber.e("Got Certificate bytes")
}
.map {
String(it as ByteArray)
}
.doOnNext {
Timber.e("Got certificate: $it")
}
.singleOrError()
您可以使用通知来缓冲您的数据。
device.establishConnection(false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(characteristicUuid))
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(
bytes -> {
// Given characteristic has been changes, here is the value.
},
throwable -> {
// Handle an error here.
}
);
要重复多次读取直到发出特定值,需要更改此部分:
// I want to get an Observable that can read multiple times here.
it.readCharacteristic(serverCertCharacteristicUUID).toObservable()
类似于 suggested by the RxJava author in the first answer that google gives for phrase rxjava single repeat
:
// this will repeat until a `checkRepeatIf` returns false
Observable.defer {
val successValue = AtomicReference<ByteArray>()
connection.readCharacteristic(serverCertCharacteristicUUID)
.doOnSuccess { successValue.lazySet(it) }
.repeatWhen { completes -> completes.takeWhile { checkRepeatIf(successValue.get()) } }
}
我可以通过发送一个信号来停止 connectionObservable
和蓝牙特性的读取来使它正常工作。值得注意的是你需要在 repeat()
之后调用 toObservable()
否则这不起作用,虽然我不知道为什么。
override fun readMultipartCharacteristic(macAddress: String): Single<String> {
val CERTIFICATE_TERMINATOR = 0x30.toByte()
val device = bluetoothService.getBleDevice(macAddress)
if (connectionObservable == null || !device.connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED)) {
connectionObservable = device.establishConnection(false, Timeout(30, TimeUnit.SECONDS))
}
val stop: PublishSubject<Unit> = PublishSubject.create()
return connectionObservable!!
.subscribeOn(Schedulers.io())
.takeUntil(stop)
.switchMap {
it.readCharacteristic(UUID("my-uuid"))
.repeat()
.toObservable()
.takeUntil(stop)
}
.collectInto(ByteArrayOutputStream(), { buffer, byteArray ->
// Watch for the signal of the end of the stream
if (byteArray.size == 1 && byteArray.get(0).equals(CERTIFICATE_TERMINATOR)) {
stop.onComplete()
} else {
buffer.write(byteArray)
}
})
.map {
String(it.toByteArray())
}
}