从同一个 Observable (RxAndroidBle) 观察多次

Observe many times from same Observable (RxAndroidBle)

我正在使用 RxAndroidBle 库和 RxJava2 来读取 BLE 特性。我认为这个问题只是一个 RxJava 问题,但包括我正在使用的细节 RxAndroidBle 以防有用。

我获得连接,然后用它来调用 readCharacteristic(),它本身 returns 一个 Single<ByteArray>。在这一点上,我不只是想得到一个 ByteArray。这个特性需要多次读取,因为BLE设备设置为让我返回一个小文件,而特性一次只能返回20个字节,因此我需要重复读取。

是否可以修改此代码,使 returns 下面的 switchMap()Observable 会发出许多 ByteArrays,而不是只发出一个?

我是 RxJava 新手。

val connection: Observable<RxBleConnection> = selectedDevice.record.bleDevice.establishConnection(false, Timeout(30, TimeUnit.SECONDS))
return connection
  .subscribeOn(Schedulers.io())
  .switchMap {
    // I want to get an Observable that can read multiple times here.
    it.readCharacteristic(serverCertCharacteristicUUID).toObservable()
  }
  .doOnNext {
    Timber.e("Got Certificate bytes")
  }
  .map {
    String(it as ByteArray)
  }
  .doOnNext {
    Timber.e("Got certificate: $it")
  }
  .singleOrError()

您可以使用通知来缓冲您的数据。

device.establishConnection(false)
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(characteristicUuid))
.flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
.subscribe(
    bytes -> {
        // Given characteristic has been changes, here is the value.
    },
    throwable -> {
        // Handle an error here.
    }
);

要重复多次读取直到发出特定值,需要更改此部分:

// I want to get an Observable that can read multiple times here.
it.readCharacteristic(serverCertCharacteristicUUID).toObservable()

类似于 suggested by the RxJava author in the first answer that google gives for phrase rxjava single repeat:

// this will repeat until a `checkRepeatIf` returns false
Observable.defer {
  val successValue = AtomicReference<ByteArray>()
  connection.readCharacteristic(serverCertCharacteristicUUID)
    .doOnSuccess { successValue.lazySet(it) }
    .repeatWhen { completes -> completes.takeWhile { checkRepeatIf(successValue.get()) } }
}

我可以通过发送一个信号来停止 connectionObservable 和蓝牙特性的读取来使它正常工作。值得注意的是你需要在 repeat() 之后调用 toObservable() 否则这不起作用,虽然我不知道为什么。

override fun readMultipartCharacteristic(macAddress: String): Single<String> {
  val CERTIFICATE_TERMINATOR = 0x30.toByte()

  val device = bluetoothService.getBleDevice(macAddress)
  if (connectionObservable == null || !device.connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED)) {
    connectionObservable = device.establishConnection(false, Timeout(30, TimeUnit.SECONDS))
  }

  val stop: PublishSubject<Unit> = PublishSubject.create()
  return connectionObservable!!
      .subscribeOn(Schedulers.io())
      .takeUntil(stop)
      .switchMap {
        it.readCharacteristic(UUID("my-uuid"))
            .repeat()
            .toObservable()
            .takeUntil(stop)
      }
      .collectInto(ByteArrayOutputStream(), { buffer, byteArray ->
        // Watch for the signal of the end of the stream
        if (byteArray.size == 1 && byteArray.get(0).equals(CERTIFICATE_TERMINATOR)) {
          stop.onComplete()
        } else {
          buffer.write(byteArray)
        }
      })
      .map {
        String(it.toByteArray())
      }
}