如何通过发送新的写入命令来响应 BLE 特性通知

How to respond to BLE characteristic notifications by sending a new write command

我正在更新一个应用程序以使用 RxAndroidBLE,并且正在努力解决如何将我现有的回调模式转换为 Rx 模式的问题。特别是,我需要根据接收到的数据以不同的方式响应特征通知,并向设备发送特定的写命令(然后循环更新特征)。

这背后的基本原理是,我正在集成的 BLE 设备具有特殊的自定义特性,我们可以向其发送不同的命令,然后侦听返回的数据。

我已经阅读了很多关于使用 RxBLE 链接命令的内容,但是 none 似乎解决了我的特定查询,即如何在观察到更改通知时将命令发送回设备(自连接以来当我们到达可观察块时,它本身似乎超出了范围)。这样做的 "Rx Way" 是多少?

为清楚起见,这是我的 BLE 服务的整个流程:

  1. 使用我们自定义特征的过滤器扫描设备
  2. 连接到找到的设备
  3. 读取几个标准特征(字符串),并将它们存储在我们的数据模型中
  4. 当且仅当其中一个特征匹配字符串数组之一时,继续执行 5。否则,处理连接。
  5. 订阅我们的自定义 "control" 特征 ("CC") 以获取更改通知
  6. 发送命令 1 到 CC。这应该触发答案 1 在 CC 中设置,因此我们的处理程序被调用
  7. 对答案 1 进行一些计算并保存到我们的模型中。将命令 2(其中包括这些修改后的值,因此我们无法在编译时确定)发送到 CC。这应该会触发 CC 中的答案 2。
  8. 收到答案 2 后,发送命令 3,这应该会触发答案 3。
  9. 在收到答案 3 时,解析为一个 int 值。如果答案 3 == 0,则处理连接 - 我们完成了。
  10. 答案 3 > 0,所以发送命令 4。这将触发答案 4。
  11. 对答案 4 进行一些计算并将结果存储在我们的模型中
  12. 然后发送命令5,实际上会触发答案3(命令5和3都会触发答案3)。因为我们已经订阅了答案 3,所以这让我们回到第 9 步。上面 - 我们一直循环直到答案 3 为 0(即我们已经保存了所有数据)。

编辑:我很讨厌分享代码,因为我很清楚下面的方法不可能起作用——但我希望它描述了我正在尝试做的事情,即使语法不会'甚至编译:

                  connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
                                                    .observeOn(AndroidSchedulers.mainThread())
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID))
                                                    .doOnNext(bytes -> {
                                                        //store the battery info in our model here
                                                    })
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
                                                    .doOnNext(bytes -> {
                                                                //store the serial number info in our model here
                                                                //TODO: how do we only proceed to the subscription if serialNumber is correct?
                                                            }
                                                    )
                                                    .flatMap(rxBleConnection -> rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID))
                                                    .doOnNext(notificationObservable -> {
                                                        // Notification has been set up
                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1); //we can't do this because rxBleConnection is out of scope!
                                                    })
                                                    .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
                                                    .subscribe(
                                                            bytes -> {
                                                                // Given characteristic has been changes, here is the value.

                                                                switch(commandFromBytes(bytes)){
                                                                    case answer1:
                                                                        int newCommand = doSomethingWith(bytes);
                                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
                                                                        break;
                                                                    case answer2:
                                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
                                                                        break;
                                                                    case answer3:
                                                                        if(bytes <= 0){
                                                                            connectedDevice.connectionDisposable.dispose();
                                                                        }
                                                                        else{
                                                                            rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
                                                                        }
                                                                        break;
                                                                    case answer4:

                                                                            doSomethingLongWindedWith(bytes);
                                                                            //then
                                                                            rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
                                                                            //command 5 will cause answer3 to be notified, so we loop back above                                                                             
                                                                        break;
                                                                }

                                                            },
                                                            throwable -> {
                                                                // Handle an error here.
                                                            }
                                                    );

编辑 2:玩了一会儿括号探戈后,我想我已经接近解决方案了:

 connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
                                                    .observeOn(AndroidSchedulers.mainThread())
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID)
                                                            .doOnNext(bytes -> {
                                                                connectedDevice.setBatLevel(bytes);
                                                            })
                                                            .flatMapSingle(rxBleConnection2 -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
                                                            .doOnNext(bytes -> {
                                                                        connectedDevice.setSerialNum(bytes);
                                                                        //we also notify a singleton listener here
                                                                    }
                                                            )
                                                            .flatMap(rxBleConnection3 -> {
                                                                        if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
                                                                            rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID).subscribe(
                                                                                    bytes -> {
                                                                                        // Given characteristic has been changes, here is the value.

                                                                                        switch (commandFromBytes(bytes)) {
                                                                                            case answer1:
                                                                                                int newCommand = doSomethingWith(bytes);
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
                                                                                                break;
                                                                                            case answer2:
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
                                                                                                break;
                                                                                            case answer3:
                                                                                                if (bytes <= 0) {
                                                                                                    //we also notify a singleton listener here
                                                                                                    connectedDevice.connectionDisposable.dispose();
                                                                                                } else {
                                                                                                    rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
                                                                                                }
                                                                                                break;
                                                                                            case answer4:

                                                                                                doSomethingLongWindedWith(bytes);
                                                                                                //then
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
                                                                                                //command 5 will cause answer3 to be notified, so we loop back above
                                                                                                break;
                                                                                        }
                                                                                    },
                                                                                    throwable -> {
                                                                                        // Handle an error here.
                                                                                    }
                                                                            );
                                                                        } else {
                                                                            connectedDevice.connectionDisposable.dispose();
                                                                        }
                                                                    }
                                                                            .doOnNext(notificationObservable -> {
                                                                                // Notification has been set up
                                                                                if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
                                                                                    rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1);
                                                                                }
                                                                            })
                                                            ));

根据 this Jake Wharton's talk 的说法,最好的方法是构建一个 Observable,它只会发出更新模型所需的值。

(Kotlin 中的示例)

我们可以得到 Observable 的这些输出:

sealed class ConnectionEvent {
    object CloseConnection : ConnectionEvent() // dummy event to notify when the connection can be closed
    data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}

整个流程可能如下所示:

bleDevice.establishConnection(false)
        .flatMap { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
            val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        when {
                            it != matchingSerialNumber -> Observable.just(ConnectionEvent.CloseConnection as ConnectionEvent) // close connection if serial does not match
                            else -> createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        }
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .takeWhile { it != ConnectionEvent.CloseConnection } // if the connection is to be closed -> unsubscribe
        .subscribe(
                { connectionEvent ->
                    when(connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )

write/notification 舞蹈在哪里:

private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.flatMap {
                    when (answerFromBytes(it)) {
                        answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(it)).ignoreEmissions()
                        answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                        answer3 -> when (it.isEmpty()) {
                            true -> Observable.just(ConnectionEvent.CloseConnection)
                            else -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                        }
                        answer4 -> connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()
                                .startWith(Observable.just(ConnectionEvent.Answer4(it)))
                        else -> Observable.error(Exception("Unexpected answer! => ${answerFromBytes(it)}"))
                    }
                }
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // initiate with the command1
            }
}

为了更清晰,我使用了扩展函数:

fun Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<ConnectionEvent>()

编辑:

我稍微更改了代码以摆脱 CloseConnection 事件并利用可观察对象的完成。所以现在输出看起来像这样:

sealed class ConnectionEvent {
    data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}

主要流程:

bleDevice.establishConnection(false)
        .map { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
            val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        if (it == matchingSerialNumber) createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        else Observable.empty() // do not continue if serial number does not match
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .publish {
            // create a Completable from the above Observable.concat()
            val dataDownloadCompletable = it.take(1) // take the first emission (there will be only one)
                    .flatMapCompletable { it.ignoreElements() } // and wait until the first emission completes
            it.takeUntil(dataDownloadCompletable.toObservable<Any>()) // when dataDownloadCompletable completes —> unsubscribe from the upstream, mainly .establishConnection() to close it
        }
        .flatMap { it } // unwrap the above flow
        .subscribe(
                { connectionEvent ->
                    when (connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )

Write/notification部分:

private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.map { Pair(answerFromBytes(it), it) } // map every response to a pair of <answer, bytes>
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // and start with writing command1 to initiate the data exchange
            }
            .takeWhile { (answer, bytes) -> !(answer == answer3 && bytes.isEmpty()) } // end the createContinuationObservable on the first answer3 with an empty bytes
            .flatMap<ConnectionEvent> { (answer, bytes) ->
                when (answer) {
                    answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(bytes)).ignoreEmissions()
                    answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                    answer3 -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                    answer4 -> Observable.just(ConnectionEvent.Answer4(bytes)) // when answer4 is received emit actionable item to update the model
                            .concatWith(connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()) // and send the next command5
                    else -> Observable.error(Exception("Unexpected answer! => $answer"))
                }
            }
}

以及扩展名:

fun <T> Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<T>()