阻塞 RxAndroidBle 写操作

Blocking RxAndroidBle write operation

如何使用 RxAndroidBle 在 Android 中执行阻塞写操作。只有写操作成功才应该执行下一个命令。

protected void doWriteBytes(UUID characteristic, byte[] bytes) {
    final Disposable disposable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
            .subscribe(
                    value -> {
                        Timber.d("Write characteristic %s: %s",
                                BluetoothGattUuid.prettyPrint(characteristic),
                                byteInHex(value));
                    },
                    throwable -> onError(throwable)
            );

    compositeDisposable.add(disposable);
}


protected void test() {
  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});

  // following command should be only performed if doWriteBytes is successful executed
  foo();

  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});

  bar();
}

我知道 subscribe 和 onComplete 但是没有这些方法也可以吗?

背景是我想在几个不同的 sub类 中覆盖测试方法,所以我可以执行各种 doWriteBytes 命令(例如 ACK 命令)来向蓝牙设备发送一些字节,但我需要确保仅在 ACK 命令发送成功后才执行下一个命令。

也许它更像是一个 RxJava2 问题,但我不太熟悉它。

编辑:

感谢@Dariusz Seweryn 的回答。对不起,我的问题可能不是很清楚。我会尽量具体化。

我想像 test() 中的普通函数一样编写源代码来抽象 RxJava2 实现。唯一不同的是 doWriteBytes 和其他蓝牙操作(通知、读取)应该通过 RxAndroidBle 完成。 我必须写入蓝牙设备的内容取决于通知字节或 test() 方法中的某些其他算法。此外,我想重写 test() 方法来为完全不同的蓝牙设备实现不同的蓝牙通信流程。顺序处理蓝牙操作始终很重要。

现在我想到了三个想法:

1) 我的第一个想法是实现所有 RxAndroidBle 操作阻塞,所以我可以使用简单的,例如循环。

2) 我的第二个想法是在运行时动态添加 (concat?) 观察到 test() 方法中的另一个顺序处理但我总是需要来自先前观察的 return 值?

3) 我的第三个想法是将write/notify/write操作组合成一个方法,我可以在test()方法中调用它。操作应该是将字节写入特征 A,然后等待特征 B 的通知,对接收到的字节进行一些处理,然后再次写入特征 C。 但是在添加的test()方法中写了什么或者通知过程在运行时应该是动态的。

也许在 RxJava2 中有一个优雅的解决方案,或者根本不可能?

编辑2:

我尝试实现所有三个想法,但不幸的是我没有成功。

1)

connectionObservable
                .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .retry(BT_RETRY_TIMES_ON_ERROR)
                .blockingSubscribe(
                        value -> {
                            Timber.d("Write characteristic %s: %s",
                                    BluetoothGattUuid.prettyPrint(characteristic),
                                    byteInHex(value));
                            processBtQueue();
                        },
                        throwable -> onError(throwable)
                );

即使成功也总是阻塞?我必须在某个地方发布它吗?另外,方法 return void 不再是一次性的,但我无法处理它。

2) 我正在为这个想法苦苦挣扎。如果我不知道起始 Observable,我应该连接到哪个 Observable? connectionObserable 不起作用,因为它持有 RxBleConnection。第二个问题是蓝牙操作后的值是 Java Object 类!?我必须每次都施放它吗?你有没有一个例子,我可以如何将蓝牙写入操作连接到通知蓝牙结果?

3) 这个想法的问题是我不知道如何在运行时将处理部分动态添加到 RxJava 订阅部分之外的通知?

我有一个想法 nr 3 的工作解决方案

protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
    Observable observable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
        .flatMap( writeBytes -> notificationObservable)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR);

    compositeDisposable.add(observable.subscribe());

    return observable;
}

顺便说一句。我应该针对这些问题在 Whosebug 上创建单独的线程吗?

如果有帮助,可以找我的实验源码here

I know subscribe and onComplete but it is also possible to do without these methods?

给定:

Completable foo() { ... }

Completable bar() { ... )

一个人可以做到:

Disposable testDisposable = connectionObservable
                                .flatMapCompletable(connection ->
                                    connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
                                        .andThen(foo())
                                        .andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
                                        .andThen(bar())
                                )
                                .subscribe(
                                     () -> { /* completed */ },
                                     throwable -> { /* error happened */ }
                                )

用一个 .subscribe() 关闭上面的内容,就可以控制流将完成的连接数。在上面的示例中,如果连接在第一次写入期间过早结束——所有后续操作(foo()、写入、bar())将根本不会发生。

编辑:

您的所有想法都可能奏效 — 您可以尝试一下。

Maybe there is an elegant solution for my problem in RxJava2 or it is not possible at all?

Observable/Single/Completable 类 有 .blocking*() 个功能,如果您确实需要它们。不过要小心——由于向应用程序引入了更多状态,您可能会开始遇到一些难以调试的问题,具体取决于您的实现。