Room RxJava observable 在插入时触发多次

Room RxJava observable triggered multiple times on insert

我的存储库实现有一个奇怪的问题。每次我调用应该从数据库获取数据并通过网络调用更新数据库的函数时,我都会从数据库观察器收到多个结果。

override fun getApplianceControls(
    serialNumber: SerialNumber
): Flowable<ApplianceControlState> {
    val subject = BehaviorProcessor.create<ApplianceControlState>()

    controlsDao.get(serialNumber.serial)
        .map { controls ->
            ApplianceControlState.Loaded(controls.toDomainModel())
        }
        .subscribe(subject)

    controlApi.getApplianceControls(serialNumber.serial)
        .flatMapObservable<ApplianceControlState> { response ->
            val entities = response.toEntity(serialNumber)
            // Store the fetched controls on the database.
            controlsDao.insert(entities).andThen(
                // Return an empty observable because the db will take care of emitting latest values.
                Observable.create { }
            )
        }
        .onErrorResumeNext { error: Throwable ->
            Observable.create { emitter -> emitter.onNext(ApplianceControlState.Error(error)) }
        }
        .subscribeOn(backgroundScheduler)
        .subscribe()


    return subject.distinctUntilChanged()
}
@Dao
interface ApplianceControlsDao {

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    fun insert(controls: List<TemperatureControlEntity>): Completable

    @Query("SELECT * FROM control_temperature WHERE serial = :serial")
    fun get(serial: String): Flowable<List<TemperatureControlEntity>>
}

基本上,如果我调用 getApplianceControls 一次,我就会得到想要的结果。然后我再次调用,使用另一个序列号,它是空的,我得到空数组。 但是我第三次调用,但使用与第一次相同的序列号,在插入调用后我得到了正确结果和空数组的混合。

像这样:

第一次调用,序列号“123”-> Loaded([control1, control2, control3])

第二次调用,序列号“000”-> Loaded([])

第三次调用,序列号“123”-> Loaded([control1, control2, control3]), Loaded([]), Loaded([control1, control2, control3])

如果我从 api 响应中删除数据库插入,它工作正常。在 insert 被调用后,所有奇怪的事情都发生了。

编辑:getApplianceControls() 是从 ViewModel 调用的。

fun loadApplianceControls(serialNumber: SerialNumber) {
    Log.i("Loading appliance controls")

    applianceControlRepository.getApplianceControls(serialNumber)
        .subscribeOn(backgroundScheduler)
        .observeOn(mainScheduler)
        .subscribeBy(
            onError = { error ->
                Log.e("Error $error")
            },
            onNext = { controlState ->
                _controlsLiveData.value = controlState  
            }
        ).addTo(disposeBag)
}

正如我在评论中提到的那样,您有 2 个未在任何地方取消订阅的订阅,这可能会导致内存泄漏(处置主题时不会处置),而且通过这种实现,您可以忽略 API 错误。 我会尝试将其更改为:

override fun getApplianceControls(serialNumber: SerialNumber): Flowable<ApplianceControlState> {

    val dbObservable = controlsDao.get(serialNumber.serial)
        .map { controls ->
            ApplianceControlState.Loaded(controls.toDomainModel())
        }

    val apiObservable = controlApi.getApplianceControls(serialNumber.serial)
        .map { response ->
            val entities = response.toEntity(serialNumber)
           // Store the fetched controls on the database.
           controlsDao.insert(entities).andThen( Unit )
        }
        .toObservable()
        .startWith(Unit)

    return Observables.combineLatest(dbObservable, apiObservable) { dbData, _ -> dbData }
        // apiObservable emits are ignored, but it will by subscribed with dbObservable and Errors are not ignored 
        .onErrorResumeNext { error: Throwable ->
            Observable.create { emitter -> emitter.onNext(ApplianceControlState.Error(error)) }
        }
        .subscribeOn(backgroundScheduler)
        //observeOn main Thread
        .distinctUntilChanged()
}

我不确定它是否解决了原来的问题。但如果是这样 - 问题出在 flatMapObservable 还有助于查看 controlApi.getApplianceControls() 实施。