使用 RxJava flowable 处理 Android 房间查询时提高性能

Improving performance when using RxJava flowable to handle Android Room queries

我的应用正在以尽可能高的采样率(在我的设备上约为 200 Hz)从加速度计收集传感器值,并将这些值保存在 Room 数据库中。我还想经常用最新的测量值更新一些图表,比如说每秒 5 次的刷新率。自从该应用程序还收集线性加速度(没有 g)也有 ~200 Hz(所以两个传感器每个大约 200Hz 将值插入数据库)我注意到应用程序性能大幅下降并且我有几秒钟的滞后在收集的加速度值和它们在图中显示之间。 从探查器我的猜测是 RxComputationThread 是瓶颈,因为由于 Flowables,它几乎一直处于活动状态。

我使用 sample() 来限制接收器更新,因为我的图表不需要经常更新。当我只收集一个传感器时,这导致了 acceptable 性能。我看到 RxJava 提供了一个 interval() 方法来限制发射端的发射频率,但我似乎不可用? (未解决的参考)。

也许有人知道如何提高性能?总的来说,我喜欢 RxJava 和 Room 的概念,并愿意坚持使用它们,但我在这一点上几乎停滞不前。

这是我用来观察房间 SQL table 并更新图表的代码:

// Observe changes to the datasource and create a new subscription if necessary
sharedViewModel.dataSource.observe(viewLifecycleOwner, Observer { source ->
    Log.d("TAG", "Change observed!")
    when (source) {
        "acc" -> {
            val disposableDataSource =
                sharedViewModel.lastSecondsAccelerations
                    .sample(200, TimeUnit.MILLISECONDS)
                    .onBackpressureDrop()
                    .subscribeOn(Schedulers.io())
                    .subscribe { lastMeasurements ->
                        Log.d("TAG", Thread.currentThread().name)
                        if (sharedViewModel.isReset.value == true && lastMeasurements.isNotEmpty()) {
                            val t =
                                lastMeasurements.map { (it.time.toDouble() * 1e-9) - (lastMeasurements.last().time.toDouble() * 1e-9) }
                            val accX = lastMeasurements.map { it.accX.toDouble() }
                            val accY = lastMeasurements.map { it.accY.toDouble() }
                            val accZ = lastMeasurements.map { it.accZ.toDouble() }

                            // Update plots
                            updatePlots(t, accX, accY, accZ)
                        }
                    }
            compositeDisposable.clear()
            compositeDisposable.add(disposableDataSource)
        }
        "lin_acc" -> {
            val disposableDataSource =
                sharedViewModel.lastSecondsLinAccelerations
                    .sample(200, TimeUnit.MILLISECONDS)
                    .onBackpressureDrop()
                    .subscribeOn(Schedulers.io())
                    .subscribe { lastMeasurements ->
                        Log.d("TAG", Thread.currentThread().name)
                        if (sharedViewModel.isReset.value == true && lastMeasurements.isNotEmpty()) {
                            val t =
                                lastMeasurements.map { (it.time.toDouble() * 1e-9) - (lastMeasurements.last().time.toDouble() * 1e-9) }
                            val accX = lastMeasurements.map { it.accX.toDouble() }
                            val accY = lastMeasurements.map { it.accY.toDouble() }
                            val accZ = lastMeasurements.map { it.accZ.toDouble() }

                            // Update plots
                            updatePlots(t, accX, accY, accZ)
                        }
                    }
            compositeDisposable.clear()
            compositeDisposable.add(disposableDataSource)
        }
    }
})

获取最近 10 秒测量值的查询

@Query("SELECT * FROM acc_measurements_table WHERE time > ((SELECT MAX(time) from acc_measurements_table)- 1e10)")
fun getLastAccelerations(): Flowable<List<AccMeasurement>>

感谢您的评论,我现在想通了,瓶颈是什么。问题是大量的插入调用,这并不奇怪。但是可以通过使用某种缓冲区一次插入多行来提高性能。

这是我添加的,以防有人遇到同样的情况:

class InsertHelper(private val repository: Repository){
    var compositeDisposable = CompositeDisposable()

    private val measurementListAcc: FlowableList<AccMeasurement> = FlowableList()
    private val measurementListLinAcc: FlowableList<LinAccMeasurement> = FlowableList()

    fun insertAcc(measurement: AccMeasurement) {
        measurementListAcc.add(measurement)
    }
    fun insertLinAcc(measurement: LinAccMeasurement) {
        measurementListLinAcc.add(measurement)
    }

    init {
        val disposableAcc = measurementListAcc.subject
            .buffer(50)
            .subscribe {measurements ->
                GlobalScope.launch {
                    repository.insertAcc(measurements)
                }
                measurementListAcc.remove(measurements as ArrayList<AccMeasurement>)
            }

        val disposableLinAcc = measurementListLinAcc.subject
            .buffer(50)
            .subscribe {measurements ->
                GlobalScope.launch {
                    repository.insertLinAcc(measurements)
                }
                measurementListLinAcc.remove(measurements as ArrayList<LinAccMeasurement>)
            }

        compositeDisposable.add(disposableAcc)
        compositeDisposable.add(disposableLinAcc)
    }
}
// Dynamic list that can be subscribed on
class FlowableList<T> {
    private val list: MutableList<T> = ArrayList()
    val subject = PublishSubject.create<T>()

    fun add(value: T) {
        list.add(value)
        subject.onNext(value)
    }

    fun remove(value: ArrayList<T>) {
        list.removeAll(value)
    }
}

我基本上是用一个动态列表来缓冲几十个测量样本,然后将它们作为一个整体插入到房间数据库中,然后从动态列表中删除它们。这里还有一些信息,为什么批量插入更快:https://hackernoon.com/squeezing-performance-from-sqlite-insertions-with-room-d769512f8330

我对 Android 开发还很陌生,所以如果您看到一些错误或有任何建议,我很感激每条评论:)