阻塞 RxAndroidBle 写操作
Blocking RxAndroidBle write operation
如何使用 RxAndroidBle 在 Android 中执行阻塞写操作。只有写操作成功才应该执行下一个命令。
protected void doWriteBytes(UUID characteristic, byte[] bytes) {
final Disposable disposable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.subscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
},
throwable -> onError(throwable)
);
compositeDisposable.add(disposable);
}
protected void test() {
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});
// following command should be only performed if doWriteBytes is successful executed
foo();
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});
bar();
}
我知道 subscribe 和 onComplete 但是没有这些方法也可以吗?
背景是我想在几个不同的 sub类 中覆盖测试方法,所以我可以执行各种 doWriteBytes 命令(例如 ACK 命令)来向蓝牙设备发送一些字节,但我需要确保仅在 ACK 命令发送成功后才执行下一个命令。
也许它更像是一个 RxJava2 问题,但我不太熟悉它。
编辑:
感谢@Dariusz Seweryn 的回答。对不起,我的问题可能不是很清楚。我会尽量具体化。
我想像 test() 中的普通函数一样编写源代码来抽象 RxJava2 实现。唯一不同的是 doWriteBytes 和其他蓝牙操作(通知、读取)应该通过 RxAndroidBle 完成。
我必须写入蓝牙设备的内容取决于通知字节或 test() 方法中的某些其他算法。此外,我想重写 test() 方法来为完全不同的蓝牙设备实现不同的蓝牙通信流程。顺序处理蓝牙操作始终很重要。
现在我想到了三个想法:
1) 我的第一个想法是实现所有 RxAndroidBle 操作阻塞,所以我可以使用简单的,例如循环。
2) 我的第二个想法是在运行时动态添加 (concat?) 观察到 test() 方法中的另一个顺序处理但我总是需要来自先前观察的 return 值?
3) 我的第三个想法是将write/notify/write操作组合成一个方法,我可以在test()方法中调用它。操作应该是将字节写入特征 A,然后等待特征 B 的通知,对接收到的字节进行一些处理,然后再次写入特征 C。
但是在添加的test()方法中写了什么或者通知过程在运行时应该是动态的。
也许在 RxJava2 中有一个优雅的解决方案,或者根本不可能?
编辑2:
我尝试实现所有三个想法,但不幸的是我没有成功。
1)
connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.blockingSubscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
processBtQueue();
},
throwable -> onError(throwable)
);
即使成功也总是阻塞?我必须在某个地方发布它吗?另外,方法 return void 不再是一次性的,但我无法处理它。
2) 我正在为这个想法苦苦挣扎。如果我不知道起始 Observable,我应该连接到哪个 Observable? connectionObserable 不起作用,因为它持有 RxBleConnection。第二个问题是蓝牙操作后的值是 Java Object 类!?我必须每次都施放它吗?你有没有一个例子,我可以如何将蓝牙写入操作连接到通知蓝牙结果?
3) 这个想法的问题是我不知道如何在运行时将处理部分动态添加到 RxJava 订阅部分之外的通知?
我有一个想法 nr 3 的工作解决方案
protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
Observable observable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.flatMap( writeBytes -> notificationObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR);
compositeDisposable.add(observable.subscribe());
return observable;
}
顺便说一句。我应该针对这些问题在 Whosebug 上创建单独的线程吗?
如果有帮助,可以找我的实验源码here。
I know subscribe and onComplete but it is also possible to do without these methods?
给定:
Completable foo() { ... }
Completable bar() { ... )
一个人可以做到:
Disposable testDisposable = connectionObservable
.flatMapCompletable(connection ->
connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
.andThen(foo())
.andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
.andThen(bar())
)
.subscribe(
() -> { /* completed */ },
throwable -> { /* error happened */ }
)
用一个 .subscribe()
关闭上面的内容,就可以控制流将完成的连接数。在上面的示例中,如果连接在第一次写入期间过早结束——所有后续操作(foo()
、写入、bar()
)将根本不会发生。
编辑:
您的所有想法都可能奏效 — 您可以尝试一下。
Maybe there is an elegant solution for my problem in RxJava2 or it is not possible at all?
Observable
/Single
/Completable
类 有 .blocking*()
个功能,如果您确实需要它们。不过要小心——由于向应用程序引入了更多状态,您可能会开始遇到一些难以调试的问题,具体取决于您的实现。
如何使用 RxAndroidBle 在 Android 中执行阻塞写操作。只有写操作成功才应该执行下一个命令。
protected void doWriteBytes(UUID characteristic, byte[] bytes) {
final Disposable disposable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.subscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
},
throwable -> onError(throwable)
);
compositeDisposable.add(disposable);
}
protected void test() {
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});
// following command should be only performed if doWriteBytes is successful executed
foo();
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});
bar();
}
我知道 subscribe 和 onComplete 但是没有这些方法也可以吗?
背景是我想在几个不同的 sub类 中覆盖测试方法,所以我可以执行各种 doWriteBytes 命令(例如 ACK 命令)来向蓝牙设备发送一些字节,但我需要确保仅在 ACK 命令发送成功后才执行下一个命令。
也许它更像是一个 RxJava2 问题,但我不太熟悉它。
编辑:
感谢@Dariusz Seweryn 的回答。对不起,我的问题可能不是很清楚。我会尽量具体化。
我想像 test() 中的普通函数一样编写源代码来抽象 RxJava2 实现。唯一不同的是 doWriteBytes 和其他蓝牙操作(通知、读取)应该通过 RxAndroidBle 完成。 我必须写入蓝牙设备的内容取决于通知字节或 test() 方法中的某些其他算法。此外,我想重写 test() 方法来为完全不同的蓝牙设备实现不同的蓝牙通信流程。顺序处理蓝牙操作始终很重要。
现在我想到了三个想法:
1) 我的第一个想法是实现所有 RxAndroidBle 操作阻塞,所以我可以使用简单的,例如循环。
2) 我的第二个想法是在运行时动态添加 (concat?) 观察到 test() 方法中的另一个顺序处理但我总是需要来自先前观察的 return 值?
3) 我的第三个想法是将write/notify/write操作组合成一个方法,我可以在test()方法中调用它。操作应该是将字节写入特征 A,然后等待特征 B 的通知,对接收到的字节进行一些处理,然后再次写入特征 C。 但是在添加的test()方法中写了什么或者通知过程在运行时应该是动态的。
也许在 RxJava2 中有一个优雅的解决方案,或者根本不可能?
编辑2:
我尝试实现所有三个想法,但不幸的是我没有成功。
1)
connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.blockingSubscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
processBtQueue();
},
throwable -> onError(throwable)
);
即使成功也总是阻塞?我必须在某个地方发布它吗?另外,方法 return void 不再是一次性的,但我无法处理它。
2) 我正在为这个想法苦苦挣扎。如果我不知道起始 Observable,我应该连接到哪个 Observable? connectionObserable 不起作用,因为它持有 RxBleConnection。第二个问题是蓝牙操作后的值是 Java Object 类!?我必须每次都施放它吗?你有没有一个例子,我可以如何将蓝牙写入操作连接到通知蓝牙结果?
3) 这个想法的问题是我不知道如何在运行时将处理部分动态添加到 RxJava 订阅部分之外的通知?
我有一个想法 nr 3 的工作解决方案
protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
Observable observable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.flatMap( writeBytes -> notificationObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR);
compositeDisposable.add(observable.subscribe());
return observable;
}
顺便说一句。我应该针对这些问题在 Whosebug 上创建单独的线程吗?
如果有帮助,可以找我的实验源码here。
I know subscribe and onComplete but it is also possible to do without these methods?
给定:
Completable foo() { ... }
Completable bar() { ... )
一个人可以做到:
Disposable testDisposable = connectionObservable
.flatMapCompletable(connection ->
connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
.andThen(foo())
.andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
.andThen(bar())
)
.subscribe(
() -> { /* completed */ },
throwable -> { /* error happened */ }
)
用一个 .subscribe()
关闭上面的内容,就可以控制流将完成的连接数。在上面的示例中,如果连接在第一次写入期间过早结束——所有后续操作(foo()
、写入、bar()
)将根本不会发生。
编辑:
您的所有想法都可能奏效 — 您可以尝试一下。
Maybe there is an elegant solution for my problem in RxJava2 or it is not possible at all?
Observable
/Single
/Completable
类 有 .blocking*()
个功能,如果您确实需要它们。不过要小心——由于向应用程序引入了更多状态,您可能会开始遇到一些难以调试的问题,具体取决于您的实现。