如何通过发送新的写入命令来响应 BLE 特性通知
How to respond to BLE characteristic notifications by sending a new write command
我正在更新一个应用程序以使用 RxAndroidBLE,并且正在努力解决如何将我现有的回调模式转换为 Rx 模式的问题。特别是,我需要根据接收到的数据以不同的方式响应特征通知,并向设备发送特定的写命令(然后循环更新特征)。
这背后的基本原理是,我正在集成的 BLE 设备具有特殊的自定义特性,我们可以向其发送不同的命令,然后侦听返回的数据。
我已经阅读了很多关于使用 RxBLE 链接命令的内容,但是 none 似乎解决了我的特定查询,即如何在观察到更改通知时将命令发送回设备(自连接以来当我们到达可观察块时,它本身似乎超出了范围)。这样做的 "Rx Way" 是多少?
为清楚起见,这是我的 BLE 服务的整个流程:
- 使用我们自定义特征的过滤器扫描设备
- 连接到找到的设备
- 读取几个标准特征(字符串),并将它们存储在我们的数据模型中
- 当且仅当其中一个特征匹配字符串数组之一时,继续执行 5。否则,处理连接。
- 订阅我们的自定义 "control" 特征 ("CC") 以获取更改通知
- 发送命令 1 到 CC。这应该触发答案 1 在 CC 中设置,因此我们的处理程序被调用
- 对答案 1 进行一些计算并保存到我们的模型中。将命令 2(其中包括这些修改后的值,因此我们无法在编译时确定)发送到 CC。这应该会触发 CC 中的答案 2。
- 收到答案 2 后,发送命令 3,这应该会触发答案 3。
- 在收到答案 3 时,解析为一个 int 值。如果答案 3 == 0,则处理连接 - 我们完成了。
- 答案 3 > 0,所以发送命令 4。这将触发答案 4。
- 对答案 4 进行一些计算并将结果存储在我们的模型中
- 然后发送命令5,实际上会触发答案3(命令5和3都会触发答案3)。因为我们已经订阅了答案 3,所以这让我们回到第 9 步。上面 - 我们一直循环直到答案 3 为 0(即我们已经保存了所有数据)。
编辑:我很讨厌分享代码,因为我很清楚下面的方法不可能起作用——但我希望它描述了我正在尝试做的事情,即使语法不会'甚至编译:
connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
.observeOn(AndroidSchedulers.mainThread())
.flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID))
.doOnNext(bytes -> {
//store the battery info in our model here
})
.flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
.doOnNext(bytes -> {
//store the serial number info in our model here
//TODO: how do we only proceed to the subscription if serialNumber is correct?
}
)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1); //we can't do this because rxBleConnection is out of scope!
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(
bytes -> {
// Given characteristic has been changes, here is the value.
switch(commandFromBytes(bytes)){
case answer1:
int newCommand = doSomethingWith(bytes);
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
break;
case answer2:
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
break;
case answer3:
if(bytes <= 0){
connectedDevice.connectionDisposable.dispose();
}
else{
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
}
break;
case answer4:
doSomethingLongWindedWith(bytes);
//then
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
//command 5 will cause answer3 to be notified, so we loop back above
break;
}
},
throwable -> {
// Handle an error here.
}
);
编辑 2:玩了一会儿括号探戈后,我想我已经接近解决方案了:
connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
.observeOn(AndroidSchedulers.mainThread())
.flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID)
.doOnNext(bytes -> {
connectedDevice.setBatLevel(bytes);
})
.flatMapSingle(rxBleConnection2 -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
.doOnNext(bytes -> {
connectedDevice.setSerialNum(bytes);
//we also notify a singleton listener here
}
)
.flatMap(rxBleConnection3 -> {
if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID).subscribe(
bytes -> {
// Given characteristic has been changes, here is the value.
switch (commandFromBytes(bytes)) {
case answer1:
int newCommand = doSomethingWith(bytes);
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
break;
case answer2:
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
break;
case answer3:
if (bytes <= 0) {
//we also notify a singleton listener here
connectedDevice.connectionDisposable.dispose();
} else {
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
}
break;
case answer4:
doSomethingLongWindedWith(bytes);
//then
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
//command 5 will cause answer3 to be notified, so we loop back above
break;
}
},
throwable -> {
// Handle an error here.
}
);
} else {
connectedDevice.connectionDisposable.dispose();
}
}
.doOnNext(notificationObservable -> {
// Notification has been set up
if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1);
}
})
));
根据 this Jake Wharton's talk 的说法,最好的方法是构建一个 Observable
,它只会发出更新模型所需的值。
(Kotlin 中的示例)
我们可以得到 Observable
的这些输出:
sealed class ConnectionEvent {
object CloseConnection : ConnectionEvent() // dummy event to notify when the connection can be closed
data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}
整个流程可能如下所示:
bleDevice.establishConnection(false)
.flatMap { connection ->
val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
.flatMapObservable {
when {
it != matchingSerialNumber -> Observable.just(ConnectionEvent.CloseConnection as ConnectionEvent) // close connection if serial does not match
else -> createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
}
}
Observable.concat( // the actual flow of the whole connection
batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
)
}
.takeWhile { it != ConnectionEvent.CloseConnection } // if the connection is to be closed -> unsubscribe
.subscribe(
{ connectionEvent ->
when(connectionEvent) {
is ConnectionEvent.SerialNumber -> { /* Update model */ }
is ConnectionEvent.BatteryLevel -> { /* Update model */ }
is ConnectionEvent.Answer4 -> { /* Update model */ }
}
},
{ /* handle errors */ }
)
write/notification 舞蹈在哪里:
private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
return connection.setupNotification(customCharUuid)
.flatMap { ccNotifications ->
ccNotifications.flatMap {
when (answerFromBytes(it)) {
answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(it)).ignoreEmissions()
answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
answer3 -> when (it.isEmpty()) {
true -> Observable.just(ConnectionEvent.CloseConnection)
else -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
}
answer4 -> connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()
.startWith(Observable.just(ConnectionEvent.Answer4(it)))
else -> Observable.error(Exception("Unexpected answer! => ${answerFromBytes(it)}"))
}
}
.startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // initiate with the command1
}
}
为了更清晰,我使用了扩展函数:
fun Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<ConnectionEvent>()
编辑:
我稍微更改了代码以摆脱 CloseConnection
事件并利用可观察对象的完成。所以现在输出看起来像这样:
sealed class ConnectionEvent {
data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}
主要流程:
bleDevice.establishConnection(false)
.map { connection ->
val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
.flatMapObservable {
if (it == matchingSerialNumber) createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
else Observable.empty() // do not continue if serial number does not match
}
Observable.concat( // the actual flow of the whole connection
batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
)
}
.publish {
// create a Completable from the above Observable.concat()
val dataDownloadCompletable = it.take(1) // take the first emission (there will be only one)
.flatMapCompletable { it.ignoreElements() } // and wait until the first emission completes
it.takeUntil(dataDownloadCompletable.toObservable<Any>()) // when dataDownloadCompletable completes —> unsubscribe from the upstream, mainly .establishConnection() to close it
}
.flatMap { it } // unwrap the above flow
.subscribe(
{ connectionEvent ->
when (connectionEvent) {
is ConnectionEvent.SerialNumber -> { /* Update model */ }
is ConnectionEvent.BatteryLevel -> { /* Update model */ }
is ConnectionEvent.Answer4 -> { /* Update model */ }
}
},
{ /* handle errors */ }
)
Write/notification部分:
private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
return connection.setupNotification(customCharUuid)
.flatMap { ccNotifications ->
ccNotifications.map { Pair(answerFromBytes(it), it) } // map every response to a pair of <answer, bytes>
.startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // and start with writing command1 to initiate the data exchange
}
.takeWhile { (answer, bytes) -> !(answer == answer3 && bytes.isEmpty()) } // end the createContinuationObservable on the first answer3 with an empty bytes
.flatMap<ConnectionEvent> { (answer, bytes) ->
when (answer) {
answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(bytes)).ignoreEmissions()
answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
answer3 -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
answer4 -> Observable.just(ConnectionEvent.Answer4(bytes)) // when answer4 is received emit actionable item to update the model
.concatWith(connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()) // and send the next command5
else -> Observable.error(Exception("Unexpected answer! => $answer"))
}
}
}
以及扩展名:
fun <T> Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<T>()
我正在更新一个应用程序以使用 RxAndroidBLE,并且正在努力解决如何将我现有的回调模式转换为 Rx 模式的问题。特别是,我需要根据接收到的数据以不同的方式响应特征通知,并向设备发送特定的写命令(然后循环更新特征)。
这背后的基本原理是,我正在集成的 BLE 设备具有特殊的自定义特性,我们可以向其发送不同的命令,然后侦听返回的数据。
我已经阅读了很多关于使用 RxBLE 链接命令的内容,但是 none 似乎解决了我的特定查询,即如何在观察到更改通知时将命令发送回设备(自连接以来当我们到达可观察块时,它本身似乎超出了范围)。这样做的 "Rx Way" 是多少?
为清楚起见,这是我的 BLE 服务的整个流程:
- 使用我们自定义特征的过滤器扫描设备
- 连接到找到的设备
- 读取几个标准特征(字符串),并将它们存储在我们的数据模型中
- 当且仅当其中一个特征匹配字符串数组之一时,继续执行 5。否则,处理连接。
- 订阅我们的自定义 "control" 特征 ("CC") 以获取更改通知
- 发送命令 1 到 CC。这应该触发答案 1 在 CC 中设置,因此我们的处理程序被调用
- 对答案 1 进行一些计算并保存到我们的模型中。将命令 2(其中包括这些修改后的值,因此我们无法在编译时确定)发送到 CC。这应该会触发 CC 中的答案 2。
- 收到答案 2 后,发送命令 3,这应该会触发答案 3。
- 在收到答案 3 时,解析为一个 int 值。如果答案 3 == 0,则处理连接 - 我们完成了。
- 答案 3 > 0,所以发送命令 4。这将触发答案 4。
- 对答案 4 进行一些计算并将结果存储在我们的模型中
- 然后发送命令5,实际上会触发答案3(命令5和3都会触发答案3)。因为我们已经订阅了答案 3,所以这让我们回到第 9 步。上面 - 我们一直循环直到答案 3 为 0(即我们已经保存了所有数据)。
编辑:我很讨厌分享代码,因为我很清楚下面的方法不可能起作用——但我希望它描述了我正在尝试做的事情,即使语法不会'甚至编译:
connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
.observeOn(AndroidSchedulers.mainThread())
.flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID))
.doOnNext(bytes -> {
//store the battery info in our model here
})
.flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
.doOnNext(bytes -> {
//store the serial number info in our model here
//TODO: how do we only proceed to the subscription if serialNumber is correct?
}
)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID))
.doOnNext(notificationObservable -> {
// Notification has been set up
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1); //we can't do this because rxBleConnection is out of scope!
})
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(
bytes -> {
// Given characteristic has been changes, here is the value.
switch(commandFromBytes(bytes)){
case answer1:
int newCommand = doSomethingWith(bytes);
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
break;
case answer2:
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
break;
case answer3:
if(bytes <= 0){
connectedDevice.connectionDisposable.dispose();
}
else{
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
}
break;
case answer4:
doSomethingLongWindedWith(bytes);
//then
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
//command 5 will cause answer3 to be notified, so we loop back above
break;
}
},
throwable -> {
// Handle an error here.
}
);
编辑 2:玩了一会儿括号探戈后,我想我已经接近解决方案了:
connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
.observeOn(AndroidSchedulers.mainThread())
.flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID)
.doOnNext(bytes -> {
connectedDevice.setBatLevel(bytes);
})
.flatMapSingle(rxBleConnection2 -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
.doOnNext(bytes -> {
connectedDevice.setSerialNum(bytes);
//we also notify a singleton listener here
}
)
.flatMap(rxBleConnection3 -> {
if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID).subscribe(
bytes -> {
// Given characteristic has been changes, here is the value.
switch (commandFromBytes(bytes)) {
case answer1:
int newCommand = doSomethingWith(bytes);
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
break;
case answer2:
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
break;
case answer3:
if (bytes <= 0) {
//we also notify a singleton listener here
connectedDevice.connectionDisposable.dispose();
} else {
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
}
break;
case answer4:
doSomethingLongWindedWith(bytes);
//then
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
//command 5 will cause answer3 to be notified, so we loop back above
break;
}
},
throwable -> {
// Handle an error here.
}
);
} else {
connectedDevice.connectionDisposable.dispose();
}
}
.doOnNext(notificationObservable -> {
// Notification has been set up
if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1);
}
})
));
根据 this Jake Wharton's talk 的说法,最好的方法是构建一个 Observable
,它只会发出更新模型所需的值。
(Kotlin 中的示例)
我们可以得到 Observable
的这些输出:
sealed class ConnectionEvent {
object CloseConnection : ConnectionEvent() // dummy event to notify when the connection can be closed
data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}
整个流程可能如下所示:
bleDevice.establishConnection(false)
.flatMap { connection ->
val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
.flatMapObservable {
when {
it != matchingSerialNumber -> Observable.just(ConnectionEvent.CloseConnection as ConnectionEvent) // close connection if serial does not match
else -> createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
}
}
Observable.concat( // the actual flow of the whole connection
batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
)
}
.takeWhile { it != ConnectionEvent.CloseConnection } // if the connection is to be closed -> unsubscribe
.subscribe(
{ connectionEvent ->
when(connectionEvent) {
is ConnectionEvent.SerialNumber -> { /* Update model */ }
is ConnectionEvent.BatteryLevel -> { /* Update model */ }
is ConnectionEvent.Answer4 -> { /* Update model */ }
}
},
{ /* handle errors */ }
)
write/notification 舞蹈在哪里:
private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
return connection.setupNotification(customCharUuid)
.flatMap { ccNotifications ->
ccNotifications.flatMap {
when (answerFromBytes(it)) {
answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(it)).ignoreEmissions()
answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
answer3 -> when (it.isEmpty()) {
true -> Observable.just(ConnectionEvent.CloseConnection)
else -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
}
answer4 -> connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()
.startWith(Observable.just(ConnectionEvent.Answer4(it)))
else -> Observable.error(Exception("Unexpected answer! => ${answerFromBytes(it)}"))
}
}
.startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // initiate with the command1
}
}
为了更清晰,我使用了扩展函数:
fun Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<ConnectionEvent>()
编辑:
我稍微更改了代码以摆脱 CloseConnection
事件并利用可观察对象的完成。所以现在输出看起来像这样:
sealed class ConnectionEvent {
data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}
主要流程:
bleDevice.establishConnection(false)
.map { connection ->
val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
.flatMapObservable {
if (it == matchingSerialNumber) createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
else Observable.empty() // do not continue if serial number does not match
}
Observable.concat( // the actual flow of the whole connection
batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
)
}
.publish {
// create a Completable from the above Observable.concat()
val dataDownloadCompletable = it.take(1) // take the first emission (there will be only one)
.flatMapCompletable { it.ignoreElements() } // and wait until the first emission completes
it.takeUntil(dataDownloadCompletable.toObservable<Any>()) // when dataDownloadCompletable completes —> unsubscribe from the upstream, mainly .establishConnection() to close it
}
.flatMap { it } // unwrap the above flow
.subscribe(
{ connectionEvent ->
when (connectionEvent) {
is ConnectionEvent.SerialNumber -> { /* Update model */ }
is ConnectionEvent.BatteryLevel -> { /* Update model */ }
is ConnectionEvent.Answer4 -> { /* Update model */ }
}
},
{ /* handle errors */ }
)
Write/notification部分:
private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
return connection.setupNotification(customCharUuid)
.flatMap { ccNotifications ->
ccNotifications.map { Pair(answerFromBytes(it), it) } // map every response to a pair of <answer, bytes>
.startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // and start with writing command1 to initiate the data exchange
}
.takeWhile { (answer, bytes) -> !(answer == answer3 && bytes.isEmpty()) } // end the createContinuationObservable on the first answer3 with an empty bytes
.flatMap<ConnectionEvent> { (answer, bytes) ->
when (answer) {
answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(bytes)).ignoreEmissions()
answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
answer3 -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
answer4 -> Observable.just(ConnectionEvent.Answer4(bytes)) // when answer4 is received emit actionable item to update the model
.concatWith(connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()) // and send the next command5
else -> Observable.error(Exception("Unexpected answer! => $answer"))
}
}
}
以及扩展名:
fun <T> Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<T>()