如何在 RxAndroidBLE 中 enable/disable 到 notification/indication

How to enable/disable to notification/indication in RxAndroidBLE

我正在创建一个 RxJava2 链,我想在其中启用和禁用通知。我设置的流程如下

  1. 建立连接。
  2. 将通知设置为 READ_STATUS UUID
  3. 如果返回的字节是zero然后执行一个写字节01WRITE_STATUS UUID并且在WRITE_STATUS之后,启用READ_STATUS UUID的通知来验证它有字节值 1.
  4. 否则,如果返回的字节是 1,则只需启用其他指标(UUID1UUID2UUD3)并读取值。

我在第 2 步和第 3 步遇到问题,我通过启用通知读取 READ_STATUS UUID 的值。为了重新读取值,我可能需要禁用通知然后再次启用它。为了禁用通知,我必须处理那个特定的 setupNotification .

代码如下

  connectDisposable=

                    device.establishConnection(false)

                   .flatMap(rxBleConnection -> {

                       rxBleConnection.discoverServices();
                       mRxBleConnection = rxBleConnection;
                       return Observable.just(rxBleConnection);
                   })
                   .flatMap(rxBleConnection ->mRxBleConnection.setupNotification(READ_STATUS,NotificationSetupMode.QUICK_SETUP).flatMap(it->it))
                   .takeUntil(bytes -> { 
                        
                        if(getByteValue(bytes)==0)
                            return false;// dispose above to disable the notification
                        else
                            return true; // no need to disable the notification and continue writing
                    })
                            

                     .flatMap(bytes -> {

                        return Observable.zip(

                                mRxBleConnection.writeCharacteristic(WRITE_STATUS, new byte[]{1}).toObservable(),
                                // setupNotification again to check whether read status has 1 or not 
                                mRxBleConnection.setupNotification(READ_STATUS, NotificationSetupMode.QUICK_SETUP).flatMap(it->it),
                                Pair::new

                        );
                    })
                     .flatMap(bytes ->{

                        byte [] val= bytes.first;
                        if(getByteValue(val) == 1){

                            return Observable.zip(
                                    mRxBleConnection.setupIndication(HISTORY, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 1 ")).flatMap(it -> it),
                                    mRxBleConnection.setupIndication(PARAMCHECK, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 2 ")).flatMap(it -> it),
                                    mRxBleConnection.setupIndication(FAULTINFO, NotificationSetupMode.QUICK_SETUP).doOnNext(observable -> Log.e(TAG,"Here 3 ")).flatMap(it -> it),
                                  
                                    Data::Readings);
                        }
                       
                        return Observable.empty();
                    }).subscribe(data -> {
                    
                    });

此代码的问题是我的 takeUntil 在最后触发它没有处理之前的 setupNotificaion 操作,以便我稍后可以重新阅读它。

我尝试了 线程中提到的解决方案,但不幸的是我没有分享 RxBleConnection

要取消订阅或处理 setupNotificationsetupIndication 可以使用以下代码。我确信可能有不同的方法,但到目前为止我能找到这个

private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){

         final PublishSubject<Boolean> unsubscribeOperation= PublishSubject.create();


        return Observable.zip(

                rxBleConnection.setupIndication(TSDictionary.FAULT_RETRY_COUNT_SEQUENCE,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeOperation),
                rxBleConnection.setupIndication(TSDictionary.FAULT_RETRY_INFORMATION,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeOperation),

                (bytes, bytes2) -> {

                    unsubscribeOperation.onNext(true);

                    return Pair.create(bytes,bytes2);
                }
        );
}

在上面的代码中,我正在压缩两个指示操作,一旦我从中获得值,我就会使用 PublishSubjecttakeUntil.

取消订阅更改链

The problem with this code is my takeUntil is firing at the last it does not dispose the previous setupNotificaion operation so that I can re read it later.

问题是你的条件倒置了。来自 .takeUntil() Javadoc:

 * @return an Observable that first emits items emitted by the source Observable, checks the specified
 *         condition after each item, and then completes when the condition is satisfied.

您使用过:

                   .takeUntil(bytes -> { 
                        
                        if(getByteValue(bytes)==0)
                            return false;// dispose above to disable the notification
                        else
                            return true; // no need to disable the notification and continue writing
                    })

当上游应该被处理时应该满足的地方(return 真):

                   .takeUntil(bytes -> { 
                        
                        if(getByteValue(bytes)==0)
                            return true;// dispose above to disable the notification
                        else
                            return false; // no need to disable the notification and continue writing
                    })