如何以线性方式连接两个可观察的操作(首先做这件事,然后做完第二件事)?

How to concatenate two observable operations in a linear fashion (do first this thing and after that one finishes do the second thing)?

Polidea 发布了一个新的handy library called RxAndroidBle,这对于处理使用普通蓝牙 API 时的许多问题非常有用。

在解释更多之前,我的想法是拥有一个 POJO 模型,其中包含设备发送(或我查询)给我的所有最新值(在本例中由 Map 对象表示):

如果我想收到有关多个特征通知的通知,我可以这样做:

final UUID serviceUuid = // your service UUID
final Map<UUID, byte[]> genericModel = new HashMap<>();
final Observable<RxBleConnection> connectionObservable = // your connectionObservable

connectionObservable
        .flatMap(connection -> 
                connection.discoverServices()
                        .flatMap(services -> services.getService(serviceUuid).map(BluetoothGattService::getCharacteristics)) // get characteristics you're interested in
                        .flatMap(Observable::from) // deal with every characteristic separately
                        .flatMap(characteristic -> connection
                                        .setupNotification(characteristic) // setup notification for each
                                        .flatMap(observable -> observable), // to get the raw bytes from notification
                                Pair::new) // merge characteristic with byte[] to keep track from which characteristic the bytes came
        )
        .subscribe(
                pair -> genericModel.put(pair.first.getUuid(), pair.second),
                throwable -> { /* handle errors */}
        );

现在,当我连接到设备时,通知正在更新 POJO 对象(在这个特定示例中为 Map)。

如果我想读取值,我可以执行以下操作:

 connectionObservable.flatMap(connection ->
    connection.discoverServices()
        .flatMap(services -> services.getService(SERVICE_ID).map(BluetoothGattService::getCharacteristics))
        .flatMap(Observable::from)
        .filter(characteristic -> BleUtils.hasReadProperty(characteristic.getProperties()))
        .flatMap(connection::readCharacteristic, Pair::new)
)
    .subscribe(
            pair -> genericModel.put(pair.first.getUuid(), pair.second),
            throwable -> { /* handle errors */}
    );

我的主要问题是:

我想在第一次连接时:读取一定数量的具有读数的特征 属性 并且只有在那之后 订阅那些具有通知的通知属性。这是连接操作。我怎样才能做到这一点?

问题是当我进行读取的第一部分时,observable 读取了那些特征属性,但不会发出 doOnComplete 方法,因为它正在等待更多,所以我无法启动或编写下一个操作订阅和收听更改。

我确实知道具有读取 属性 的特征的数量,但我想以通用方式进行(即,我是否有 7 或 15 个我想要的特征并不重要阅读,我只想全部阅读,写入 pojo 值,然后开始收听通知)。

也许选项是组成一个可观察的对象来计算成功的阅读,然后它开始监听通知。

实现该目标的最佳被动方法是什么?

让你处于这种情况是 the original thread that spawned this question

要实现您想要的效果,您可以采用多种方式。其中之一是:

    final UUID serviceUuid = // your service UUID
    final Map<UUID, byte[]> genericModel = new HashMap<>();
    final Observable<RxBleConnection> connectionObservable = // your connectionObservable

    connectionObservable
            .flatMap( // get the characteristics from the service you're interested in
                    connection -> connection
                            .discoverServices()
                            .flatMap(services -> services
                                    .getService(serviceUuid)
                                    .map(BluetoothGattService::getCharacteristics)
                            ),
                    Pair::new
            )
            .flatMap(connectionAndCharacteristics -> {
                final RxBleConnection connection = connectionAndCharacteristics.first;
                final List<BluetoothGattCharacteristic> characteristics = connectionAndCharacteristics.second;
                return readInitialValues(connection, characteristics)
                        .concatWith(setupNotifications(connection, characteristics));
            })
            .subscribe(
                    pair -> genericModel.put(pair.first.getUuid(), pair.second),
                    throwable -> { /* handle errors */}
            );

其中:

private Observable<Pair<BluetoothGattCharacteristic, byte[]>> readInitialValues(RxBleConnection connection,
                                                                                List<BluetoothGattCharacteristic> characteristics) {
    return Observable.from(characteristics) // deal with every characteristic separately
            .filter(characteristic -> (characteristic.getProperties() & BluetoothGattCharacteristic.PROPERTY_READ) != 0) // filter characteristics that have read property
            .flatMap(connection::readCharacteristic, // read characteristic
                    Pair::new); // merge characteristic with byte[] to keep track from which characteristic the bytes came
}

private Observable<Pair<BluetoothGattCharacteristic, byte[]>> setupNotifications(RxBleConnection connection,
                                                                                List<BluetoothGattCharacteristic> characteristics) {
    return Observable.from(characteristics) // deal with every characteristic separately
            .filter(characteristic -> (characteristic.getProperties() & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) // filter characteristics that have notify property
            .flatMap(characteristic -> connection
                            .setupNotification(characteristic) // setup notification for each
                            .flatMap(observable -> observable), // to get the raw bytes from notification
                    Pair::new); // merge characteristic with byte[] to keep track from which characteristic the bytes came
}

作为 concatWith() 的替代方法,您可以颠倒顺序并使用 startWith()

此致