RxAndroidBle 状态机设置通知产生无限循环 onDoNext

RxAndroidBle state machine setup notification produce infinite loop onDoNext

目前我正在尝试重构 my source code 以使用 RxAndroidBle 库。

我实现了一个状态机,它调用每个蓝牙成功操作(通知、写入、读取等)下一个状态。只要蓝牙设备是 运行.

,就应该始终建立 rxBleConnection

我的设置通知实现如下所示:

protected void setNotificationOn(UUID characteristic) {
    if (isConnected()) {
        final Disposable disposable = connectionObservable
                .flatMap(rxBleConnection -> rxBleConnection.setupNotification(characteristic))
                .doOnNext(notificationObservable -> {
                        Timber.d("Successful set notification on for %s", BluetoothGattUuid.prettyPrint(characteristic));
                        nextMachineStateStep();
                        }
                )
                .flatMap(notificationObservable -> notificationObservable)
                .observeOn(AndroidSchedulers.mainThread())
                .retry(BT_RETRY_TIMES_ON_ERROR)
                .subscribe(
                        bytes -> {
                            onBluetoothNotify(characteristic, bytes);
                            Timber.d("onCharacteristicChanged %s: %s",
                                    BluetoothGattUuid.prettyPrint(characteristic),
                                    byteInHex(bytes));
                            },
                        throwable -> onError(throwable)
                );

        compositeDisposable.add(disposable);
    }
}

如果我只为它工作的特性设置一次通知,但如果发生错误或者如果我尝试再次设置它,我将陷入 doOnNext 的无限循环并且方法 setNotificationOn 永远不会终止。我以为 doOnNext 只在通知设置成功时调用一次!? (永远不会抛出 BleConflictingNotificationAlreadySetException 或其他异常!?)

如何在同一个特征上重复设置通知?

是否有更好的方法来使用 RxAndroidBle/RxJava2 创建状态机? (我需要依次调用各种蓝牙操作)

你会发现我的完整实现here

编辑:

However if one would try to subscribe again to .setupNotification() for the same characteristic both subscribers will share the same notification.

感谢您的解释,因此不会再次调用 doOnNext(),因为如果我再次订阅 setupNotification(),通知已经成功设置。

是否可以检查通知是否已设置(以便我可以跳过此步骤继续我的下一个机器状态)?

why your API looks like this or what exactly you want to achieve

我想实现通过RxAndroidBle的蓝牙通信被抽象化,这样class BluetoothCommunication的子类就可以只使用这些方法:

protected void writeBytes(UUID characteristic, byte[] bytes)
protected void readBytes(UUID characteristic)
protected void setIndicationOn(UUID characteristic)
protected void setNotificationOn(UUID characteristic)
/*  Method is triggered if a Bluetooth data is read from a device. */
protected void onBluetoothRead(UUID characteristic, byte[] value) {}
/* Method is triggered if a Bluetooth data from a device is notified */
protected void onBluetoothNotify(UUID characteristic, byte[] value) {}

设置蓝牙通信顺序。蓝牙通信/序列/算法总是不同的,不能参数化。但它总是顺序的。要初始化蓝牙秤,我首先需要设置通知 A,然后通知 B。其次,我必须发送命令 C,之后我将收到一个或多个通知呼叫,之后我必须发送命令 B,依此类推。我支持来自不同供应商的许多蓝牙体重秤,命令/设置通知和算法的数量总是不同的。

您可以看到具有各种算法的子类here

In my experience RxJava2 does not need to have an additional state machine as it is already a library that handles state

如何抽象子类的库句柄状态?

I thought doOnNext is only called once on successful notification setup!?

在您的情况下,此 .doOnNext() 将在建立连接后为每个成功的通知设置调用。如果连接因任何原因中断,.retry() 操作员将重新订阅 connectionObservable,这可能会启动一个新连接,从而触发新的通知设置。

结论:您的 .doOnNext() 每个 .subscribe()

最多可能被调用 BT_RETRY_TIMES_ON_ERROR

How can I repeatedly setup a notification on the same characteristic?

只要订阅了 .setupNotification(),通知就会激活,无需重新设置。但是,如果有人尝试再次订阅 .setupNotification() 以获取相同的特征,则两个订阅者将共享相同的通知。在您的示例中,通知将一直处于活动状态,直到处理 compositeDisposable 或连接中断(在这种情况下,将为每个订阅建立最多 BT_RETRY_TIMES_ON_ERROR 次新连接,如上所述)。

Is there a better way to create a state machine with RxAndroidBle/RxJava2? (I need to call various Bluetooth operation sequential)

RxJava2 允许以许多不同的方式(以及顺序)构建状态转换和订阅特定 Observable 的时间表,例如

Disposable disposable = Observable.concat(Arrays.asList(
        Observable.just("start").doOnSubscribe(disposable1 -> Log.d("Subscribe", "start")),
        Observable.just("0").delay(3, TimeUnit.SECONDS).doOnSubscribe(disposable1 -> Log.d("Subscribe", "0")),
        Observable.just("1").delay(2, TimeUnit.SECONDS).doOnSubscribe(disposable1 -> Log.d("Subscribe", "1")),
        Observable.just("2").delay(1, TimeUnit.SECONDS).doOnSubscribe(disposable1 -> Log.d("Subscribe", "2")),
        Observable.just("end").doOnSubscribe(disposable1 -> Log.d("Subscribe", "end"))
))
        .subscribe(s -> Log.d("Result", s));

// 2019-01-04 12:45:13.364 31887-31887/ D/Subscribe: start
// 2019-01-04 12:45:13.364 31887-31887/ D/Result: start
// 2019-01-04 12:45:13.364 31887-31887/ D/Subscribe: 0
// 2019-01-04 12:45:16.366 31887-32529/ D/Result: 0
// 2019-01-04 12:45:16.367 31887-32529/ D/Subscribe: 1
// 2019-01-04 12:45:18.367 31887-32535/ D/Result: 1
// 2019-01-04 12:45:18.368 31887-32535/ D/Subscribe: 2
// 2019-01-04 12:45:19.369 31887-32537/ D/Result: 2
// 2019-01-04 12:45:19.371 31887-32537/ D/Subscribe: end
// 2019-01-04 12:45:19.372 31887-32537/ D/Result: end

还有其他运算符也可以使进程按顺序进行。

一个更合适的问题是为什么你的 API 看起来像这样或者你到底想达到什么目的,例如

  • 是否符合指定的外部API?
  • 该过程是否由外部实体控制?
  • 是否可以在外部 API 级别进行简化(即像 .getMeasurementData(device, callbackForResult) 这样的单一方法)?
  • 蓝牙通信过程是可参数化的还是算法总是一样的?

根据我的经验,RxJava2 不需要额外的状态机,因为它已经是一个处理状态的库。一旦知道了需求,就可以以一种完全满足需要的方式对 RxJava2 运算符链进行建模。

Is it possible to check if the notification is already setup (so that I can skip this step to go on to my next machine state)?

不,无法检查它 — 但调用者知道它是否已经设置。