RxAndroidBle 在发现另一个 BLE 设备时停止

RxAndroidBle stopped when discovering another BLE device

我正在使用 RxAndroidBle 来发现周围设备的服务和一些特征。我遇到了一个问题,即取消订阅与第一台设备的连接后,与另一台设备的建立连接不起作用。当涉及到另一个设备时,状态总是"Connection State: RxBleConnectionState{DISCONNECTED}"。

代码在这里,谁能帮我看看!

private Subscription createDeviceConnectionSubscription(final RxBleDevice 
rxBleDevice){
    bInternalStatus = DISCOVERRING;
    Log.d(LOG_TAG, "start to discover the services on: " + rxBleDevice.getMacAddress().toString());

    Subscription subscription = rxBleDevice.establishConnection(false)
            .timeout(10, TimeUnit.SECONDS)
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.from(mWorker.getLooper()))
            .flatMap(new Func1<RxBleConnection, Observable<gattConnectionResults>>() {
                @Override
                public Observable<gattConnectionResults> call(RxBleConnection rxBleConnection) {
                    return Observable.zip(
                            rxBleConnection.discoverServices(3, TimeUnit.SECONDS),               
               rxBleConnection.readCharacteristic(UUID.fromString("UUID1")),                                
               rxBleConnection.readCharacteristic(UUID.fromString("UUID2")),
                            new Func3<RxBleDeviceServices, byte[], byte[], gattConnectionResults>() {
                                @Override
                                public gattConnectionResults call(RxBleDeviceServices rxBleDeviceServices, byte[] bytes, byte[] bytes2) {
                                    //......
                                    return results;
                                }
                            }
                    );
                }
            })
            .subscribe(subscriber);

    rxBleDevice.observeConnectionStateChanges()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<RxBleConnection.RxBleConnectionState>() {
                @Override
                public void call(RxBleConnection.RxBleConnectionState rxBleConnectionState) {
                    Log.d("Gatt Connection State", ""+rxBleConnectionState);
                }
            });
 return subscription;
}

Subscriber<gattConnectionResults> subscriber = new 
Subscriber<gattConnectionResults>() {
    @Override
    public void onCompleted() {
        Log.d(LOG_TAG, "RXXX on completed called!!!");
        if(mDeviceConnectionSubscription!=null){
            handleConnectionOver();
        }
    }

    @Override
    public void onError(Throwable e) {
        Log.d(LOG_TAG, "RXXX Connection error!!! " + e);
        if(mDeviceConnectionSubscription!=null){
            handleConnectionOver();
        }
    }

    @Override
    public void onNext(gattConnectionResults results) {

        }
    }
};

private void handleConnectionOver(){
    bInternalStatus = STOPPED;
    if(mDeviceConnectionSubscription!=null && !mDeviceConnectionSubscription.isUnsubscribed()){
        mDeviceConnectionSubscription.unsubscribe();
        mDeviceConnectionSubscription = null;
    }
}

感谢@s-noopy,将取消订阅移动到 onNext() 的末尾后,RxBle 的日志是:

05-15 D/RxBle#Radio:   QUEUED RxBleRadioOperationConnect(192750313)
05-15 D/RxBle#Radio:  STARTED RxBleRadioOperationConnect(192750313)
05-15 V/RxBle#BleConnectionCompat: Connecting without reflection
05-15 D/RxBle#BluetoothGatt: onConnectionStateChange newState=2 status=0
05-15 D/RxBle#Radio: FINISHED RxBleRadioOperationConnect(192750313)
05-15 D/RxBle#Radio:   QUEUED RxBleRadioOperationServicesDiscover(178598822)
05-15 D/RxBle#Radio:  STARTED RxBleRadioOperationServicesDiscover(178598822)
05-15 D/RxBle#BluetoothGatt: onServicesDiscovered status=0
05-15 D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicRead(241700341)
05-15 D/RxBle#Radio:   QUEUED RxBleRadioOperationCharacteristicRead(24572440)
05-15 D/RxBle#Radio: FINISHED RxBleRadioOperationServicesDiscover(178598822)
05-15 D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicRead(241700341)
05-15 D/RxBle#BluetoothGatt: onCharacteristicRead characteristic=00002a00-0000-1000-8000-00805f9b34fb status=0
05-15 D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicRead(241700341)
05-15 D/RxBle#Radio:  STARTED RxBleRadioOperationCharacteristicRead(24572440)
05-15 D/RxBle#BluetoothGatt: onCharacteristicRead characteristic=00002a29-0000-1000-8000-00805f9b34fb status=0
05-15 D/RxBle#Radio:   QUEUED RxBleRadioOperationDisconnect(204169407)
05-15 D/RxBle#Radio: FINISHED RxBleRadioOperationCharacteristicRead(24572440)
05-15 D/RxBle#Radio:  STARTED RxBleRadioOperationDisconnect(204169407)
05-15 D/RxBle#BluetoothGatt: onConnectionStateChange newState=0 status=0
05-15 D/RxBle#Radio: FINISHED RxBleRadioOperationDisconnect(204169407)

您应该为每个 Subscription 创建一个新的 SubscriberSubscriber 是有状态的,一旦它获得 onCompletedonError 它将不再订阅任何其他 Observable.

或者您应该使用 Observer 而不是 SubscriberObserver 是无国籍的。