RxAndroid,如何检测可观察对象是否已完成发射

RxAndroid, How to detect if observable has finished emission

我正在编写以下代码片段以从 firebase 数据库中获取已保存食物的列表,然后使用该列表,我再次从 firebase 数据库中获取单个食物的详细信息。

以下代码工作正常,除了我无法弄清楚如何让第二个 flatMap 知道第一个 flatMap 的发射已经完成(所有食物列表都已处理)。所以我无法调用 onCompleted() 方法,因此无法检测到整个过程何时完成。

查看以下代码段中的评论:

Observable.create<List<PersonalizedFood>> {

            FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener {
                override fun onCancelled(p0: DatabaseError?) {

                }

                override fun onDataChange(p0: DataSnapshot?) {
                    val list = ArrayList<PersonalizedFood>()
                    p0?.let {
                        for (dateObject in p0.children) {
                            for (foodItem in dateObject.children) {
                                val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood
                                list.add(PersonalizedFood(food))
                            }
                        }
                    }
                    it.onNext(list)
                    it.onCompleted()
                }
            })
        }.subscribeOn(Schedulers.io()).flatMap {
            Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) 
        }.observeOn(Schedulers.io()).flatMap {
        // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called.
            personalizedFood ->

            Observable.create<Boolean>{
                FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{
                    override fun onCancelled(p0: DatabaseError?) {
                        it.onError(p0?.toException())
                    }

                    override fun onDataChange(p0: DataSnapshot?) {
                        if(p0 != null) {
                            val food = p0.getValue(FBFood::class.java)!!
                            val repo = LocalFoodRepository()
                            doAsync {
                                repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
                                repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
                                repo.saveFood(this@LoginActivity, personalizedFood)
                                it.onNext(true)
                            }

                        }else {
                            it.onNext(false)
                        }
                    }

                })
            }
        }.observeOn(Schedulers.io()).doOnCompleted{
            dismissProgressDialog()
            finish()
        }.doOnError{
            it.printStackTrace()
            dismissProgressDialog()
            finish()
        }.subscribe()

谢谢。

来自 flatMapObservable 知道 "when to all of the items have been finished" 当它发出的所有 observable 都调用了 onCompleted()。您的代码中的第二个 flatMap 从不调用 onCompleted() 因为它创建的 none observables 调用 onCompleted().

您应该在 onDataChange() 方法中调用 onCompleted()。由于在 flatMap 中创建的每个 observable 只发出一项,因此可以在 onNext() 方法之后直接调用它:

override fun onDataChange(p0: DataSnapshot?) {
    if(p0 != null) {
        val food = p0.getValue(FBFood::class.java)!!
        val repo = LocalFoodRepository()
        doAsync {
            repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
            repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
            repo.saveFood(this@LoginActivity, personalizedFood)
            it.onNext(true)
            it.onCompleted()
        }
    } else {
        it.onNext(false)
        it.onCompleted()
    }
}