我在活动之间移动和使用服务时从蓝牙丢失数据

I loose data from Bluetooth moving between activities and using Services

我正在构建一个通过蓝牙 spp 连接与外部设备交换数据的应用程序。在这里搜索了一下如何维护活动之间的连接后,我发现更好的方法是实现一个服务并将其绑定到活动。我认为我写的代码是正确的,但是当我从第一个 activity 移动到第二个 activity 并向外部设备发送请求时,我从读取缓冲区中丢失了一些答案消息。如果我在发出请求之前等待大约 5 秒,它就可以正常工作。

这是我的 类,我正在使用名为 BlueFlow:

的蓝牙库
class BluetoothService : Service() {

    private val btChannel: BtChannel by inject()

    inner class LocalBinder: Binder() {
        val bindService = this@BluetoothService
    }

    override fun onBind(intent: Intent?): IBinder {
        return LocalBinder()
    }

    fun sendRequest(data: ByteArray): Boolean {
        return btChannel.send(data)
    }

    fun readChannel(): LiveData<ByteArray>? {
        return btChannel.read()?.cancellable()?.asLiveData()
    }
}

class BtChannel(private val blueFlow: BlueFlow) {

    fun read(): Flow<ByteArray>? {
        return blueFlow.getIO()?.readByteArrayStream()
    }

    fun send(data: ByteArray): Boolean {
        return blueFlow.getIO()?.send(data) ?: false
    }

}

// This is the readByteArrayStream function from the blueFlow library
@ExperimentalCoroutinesApi
    fun readByteArrayStream(
        delayMillis: Long = 1000,
        minExpectedBytes: Int = 2,
        bufferCapacity: Int = 1024,
        readInterceptor: (ByteArray) -> ByteArray? = { it }
    ): Flow<ByteArray> = channelFlow {

        if (inputStream == null) {
            throw NullPointerException("inputStream is null. Perhaps bluetoothSocket is also null")
        }
        val buffer = ByteArray(bufferCapacity)
        val byteAccumulatorList = mutableListOf<Byte>()
        while (isActive) {
            try {
                if (inputStream.available() < minExpectedBytes) {
                    delay(delayMillis)
                    continue
                }
                val numBytes = inputStream.read(buffer)
                val readBytes = buffer.trim(numBytes)
                if (byteAccumulatorList.size >= bufferCapacity)
                    byteAccumulatorList.clear()

                byteAccumulatorList.addAll(readBytes.toList())
                val interceptor = readInterceptor(byteAccumulatorList.toByteArray())

                if (interceptor == null)
                    delay(delayMillis)

                interceptor?.let {
                    offer(it)
                    byteAccumulatorList.clear()
                }

            } catch (e: IOException) {
                byteAccumulatorList.clear()
                closeConnections()
                error("Couldn't read bytes from flow. Disconnected")
            } finally {
                if (bluetoothSocket?.isConnected != true) {
                    byteAccumulatorList.clear()
                    closeConnections()
                    break
                }
            }
        }
    }.flowOn(Dispatchers.IO)


class FirstActivity: AppCompatActivity() {

  private lateinit var viewModel: FirstViewModel
  private lateinit var mBluetoothService: BluetoothService
  private var mBound = false

  private val serviceConnection: ServiceConnection = object : ServiceConnection {
        override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
            val binder = service as BluetoothService.LocalBinder
            mBluetoothService = binder.bindService
            mBound = true
        }

        override fun onServiceDisconnected(name: ComponentName?) {
            mBound = false
        }
    }

  override fun onStart() {
      super.onStart()
      startBluetoothService()
      setObservers()
  }

  override fun onStop() {
      super.onStop()
      unbindService(serviceConnection)
      mBound = false
      removeObservers()
    }

   private fun startBluetoothService() {
    val intent = Intent(this, BluetoothService::class.java)
    bindService(intent, serviceConnection, Context.BIND_AUTO_CREATE)
   }

    private fun setObservers() {
        viewModel.hours.observe(this) {hoursTv?.text = it }
        mBluetoothService?.readChannel()?.observe(this, { viewModel.getData(it) })
    }

    private fun removeObservers() {
        viewModel.hours.removeObservers(this@FirstActivity)
        mBluetoothService?.readChannel()?.removeObservers(this@FirstActivity)
    }
}

class FirstViewModel: ViewModel() {

  val hours: MutableLiveData<String> by lazy {
      MutableLiveData<String>()
  }

  fun getData(bytes: ByteArray) = viewModelScope.launch {
        getDataSafeCall(bytes)
  }

    @ExperimentalUnsignedTypes
  private fun getDataSafeCall(bytes: ByteArray) {
      PacketFrame.decodePacket(bytes.toUByteArray(), bytes.size).onEach { updateData(it) }.launchIn(viewModelScope)
  }

  fun updateData(packet: HashMap<PacketHeader, UByteArray>) {
            packet.forEach { packet ->
                when (packet.key) {
                    PacketHeader.Hour -> {
                        hours.value = PacketFrame.payloadToString(packet.value)
                    }
                    else -> {
                    }
                }
            }
    }
}

class SecondActivity: AppCompatActivity() {
  //same as FirstActivity
}

class SecondViewModel: ViewModel() {
  // same as FirstViewModel
}

PacketFrame 是单例。我用它来组合使用自定义协议从蓝牙接收的数据包。

你有什么建议吗?

更新:

我正在阅读文档,试图找出问题出在哪里,我发现了这个:

行云流水 流是类似于序列的冷流 — 流生成器中的代码在收集流之前不会运行。

但我也发现了这个:

channelFlow 创建一个冷流实例,其中包含发送到通过 ProducerScope 提供给构建器代码块的 SendChannel 的元素。它允许元素由 运行ning 在不同上下文中或同时生成的代码生成。结果流是冷的,这意味着每次将终端运算符应用于结果流时都会调用该块。 这个构建器确保线程安全和上下文保存,因此提供的 ProducerScope 可以在不同的上下文中同时使用。一旦块中的代码及其所有子项完成,生成的流程就会完成。

我的理解是,channelFlow(用于库中的 readByteArrayStream 函数)继续 运行,直到消费者还活着并请求元素。

在我的代码中,我在 ViewModel 中启动协程,当我调用第二个 Activity 时它没有被清除,因为它仍然在堆栈中,所以第一个 ViewModel 上的函数继续从蓝牙接收数据直到该流在第二个 ViewModel 中创建,并使用观察它的 LiveData 使用它。

你怎么看?关于如何解决有什么建议吗?

我会尝试提出一些建议,尽管很难知道您想要实现什么以及为什么要在读数中间启动一个新的 activity。

我建议将您的 readChannel() return 类型更改为 SharedFlow 并使用 shareIn() 调用将 readByteArrayStream() 函数转换为 SharedFlow。然后按原样收集而不将其转换为实时数据。该库是在引入 StateFlowSharedFlow 之前构建的,因此在这种情况下这些新的流类型可能会派上用场。

另一种方法可能是再次使用 SharedFlow,它将充当事件总线并重新发出由 readByteArrayStream() 收集的数据,请参阅 here 以了解此类 EventBus 的示例实施。

目前我只能提供这些建议。我真的不确定你的用例,如果可以更好地理解,我想检查一个链接到相关 issue 的示例项目。