使用 RxAndroidBle (rxJava) 将命令列表发送到设备

Sending the list of commands to device using RxAndroidBle (rxJava)

我正在尝试通过 rxJava 将命令列表发送到设备。这是我的代码:

public void startWriteCommucation(final ArrayList<byte[]> b) {
    if (isConnected()){
            connectionObservable
                    .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
                        @Override
                        public Observable<Observable<byte[]>> call(final RxBleConnection rxBleConnection) {
                            final List<Observable<byte[]>> list = new ArrayList<>();
                            for (byte[] bytes: b){
                                Log.e("Observer", Arrays.toString(bytes));
                                list.add(rxBleConnection
                                        .writeCharacteristic(BleDevice.characteristicWrite, bytes));
                            }
                            return Observable.from(list);
                        }
                    })
                    .concatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                        @Override
                        public Observable<byte[]> call(Observable<byte[]> observable) {
                            return observable;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<byte[]>() {
                        @Override
                        public void call(byte[] bytes) {
                            view.setTextStatus("Write success");
                            Log.e("Subscriber", Arrays.toString(bytes));
                        }
                    });
        }
}

有效,然后我单击按钮一次。比如我clikc的方法:

 public void onClick(){
        ArrayList<byte[]> listCmd = new ArrayList<>();
        listCmd.add(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
        listCmd.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
        startWriteCommucation(listCmd);
}

和 LogCat 中的 myLogs:

E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

但是当我使用快速双击按钮时出现问题。然后第一次点击 observable 仍然有效,我再次点击再次调用 startWriteCommunication 方法。在此之后我的日志看起来是这样的:

 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

 E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

主要问题是它们不正常,而且我的设备工作不正常。你能帮忙找出问题吗?

问题是 RxAndroidBle 库错误(导致响应与请求不匹配)和共享两个有状态通信流之间的连接(需要按顺序进行两次写入,中间没有任何通信)。

错误:应该写入 BluetoothGattCharacteristic 的值 (byte[]) 设置得太早了。如果有两个并行写入器用于相同的特性——其中一个可以覆盖由于竞争条件而由另一个设置的 byte[]。我已经对库进行了修复,目前正在进行代码审查,应该会在不久的将来应用于 SNAPSHOT 版本。

更改后的输出将如下所示:

D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

可能的解决方案

如果您不想在用户快速点击按钮两次的情况下触发两次流 - 您可以创建一个可共享的流:

Observable<byte[]> theSharedFlow = rxBleConnection
  .writeCharacteristic(uuid, data1)
  .flatMap(writtenBytes -> rxBleConnection.writeCharacteristic(uuid, data2))
  .share()

多次订阅时只会执行一次直到完成。在上面的代码片段中,第二个 writeCharacteristic() 将在第一个发出写入的字节后被订阅(并排队等待通信)。

如果应用程序打算在共享连接的同时在任意时间按顺序发送任意命令集,则应用程序需要确保前一组命令已完成。

希望我已经回答了你的问题。如果您愿意提供有关用例的更多信息,我会尝试改进我的答案。

此致

编辑:

备选方案:

为了保留顺序,需要订阅所有 Observables,以便它们到达。 Observable 的合约是 Observable(如果是冷的)直到订阅才会执行。当使用 flatMap() 时,第二个 Observable 在第一个 Observable 发出后被订阅。

为了让两个写入都被传输以便它们必须以相同的顺序被订阅,所以流程可能看起来像这样:

connectionObservable
            .flatMap(rxBleConnection -> {
                Observable<byte[]> mergedObservable = null;
                for (byte[] bytes : b) {
                    Log.d("Observer", Arrays.toString(bytes));
                    final Observable<byte[]> writeObservable = rxBleConnection
                            .writeCharacteristic(uuid, bytes);

                    if (mergedObservable == null) {
                        mergedObservable = writeObservable;
                    } else {
                        // merging two Observables to be subscribed at the same time when subscribed
                        mergedObservable = mergedObservable.mergeWith(writeObservable);
                    }
                }
                return mergedObservable;
            })
            // removed .concatMap()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    bytes -> Log.d("Subscriber", Arrays.toString(bytes)),
                    throwable -> Log.e("Subscriber", "error", throwable)
            );

RxJava 显然有更多的方法来实现相同的行为,但这不是这个问题的一部分。