RxAndroid,如何检测可观察对象是否已完成发射
RxAndroid, How to detect if observable has finished emission
我正在编写以下代码片段以从 firebase 数据库中获取已保存食物的列表,然后使用该列表,我再次从 firebase 数据库中获取单个食物的详细信息。
以下代码工作正常,除了我无法弄清楚如何让第二个 flatMap 知道第一个 flatMap 的发射已经完成(所有食物列表都已处理)。所以我无法调用 onCompleted()
方法,因此无法检测到整个过程何时完成。
查看以下代码段中的评论:
Observable.create<List<PersonalizedFood>> {
FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener {
override fun onCancelled(p0: DatabaseError?) {
}
override fun onDataChange(p0: DataSnapshot?) {
val list = ArrayList<PersonalizedFood>()
p0?.let {
for (dateObject in p0.children) {
for (foodItem in dateObject.children) {
val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood
list.add(PersonalizedFood(food))
}
}
}
it.onNext(list)
it.onCompleted()
}
})
}.subscribeOn(Schedulers.io()).flatMap {
Observable.from(it) // returning a Observable that emits items of list ("it" is the list here)
}.observeOn(Schedulers.io()).flatMap {
// How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called.
personalizedFood ->
Observable.create<Boolean>{
FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{
override fun onCancelled(p0: DatabaseError?) {
it.onError(p0?.toException())
}
override fun onDataChange(p0: DataSnapshot?) {
if(p0 != null) {
val food = p0.getValue(FBFood::class.java)!!
val repo = LocalFoodRepository()
doAsync {
repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
repo.saveFood(this@LoginActivity, personalizedFood)
it.onNext(true)
}
}else {
it.onNext(false)
}
}
})
}
}.observeOn(Schedulers.io()).doOnCompleted{
dismissProgressDialog()
finish()
}.doOnError{
it.printStackTrace()
dismissProgressDialog()
finish()
}.subscribe()
谢谢。
来自 flatMap
的 Observable
知道 "when to all of the items have been finished" 当它发出的所有 observable 都调用了 onCompleted()
。您的代码中的第二个 flatMap
从不调用 onCompleted()
因为它创建的 none observables 调用 onCompleted()
.
您应该在 onDataChange()
方法中调用 onCompleted()
。由于在 flatMap
中创建的每个 observable 只发出一项,因此可以在 onNext()
方法之后直接调用它:
override fun onDataChange(p0: DataSnapshot?) {
if(p0 != null) {
val food = p0.getValue(FBFood::class.java)!!
val repo = LocalFoodRepository()
doAsync {
repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
repo.saveFood(this@LoginActivity, personalizedFood)
it.onNext(true)
it.onCompleted()
}
} else {
it.onNext(false)
it.onCompleted()
}
}
我正在编写以下代码片段以从 firebase 数据库中获取已保存食物的列表,然后使用该列表,我再次从 firebase 数据库中获取单个食物的详细信息。
以下代码工作正常,除了我无法弄清楚如何让第二个 flatMap 知道第一个 flatMap 的发射已经完成(所有食物列表都已处理)。所以我无法调用 onCompleted()
方法,因此无法检测到整个过程何时完成。
查看以下代码段中的评论:
Observable.create<List<PersonalizedFood>> {
FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener {
override fun onCancelled(p0: DatabaseError?) {
}
override fun onDataChange(p0: DataSnapshot?) {
val list = ArrayList<PersonalizedFood>()
p0?.let {
for (dateObject in p0.children) {
for (foodItem in dateObject.children) {
val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood
list.add(PersonalizedFood(food))
}
}
}
it.onNext(list)
it.onCompleted()
}
})
}.subscribeOn(Schedulers.io()).flatMap {
Observable.from(it) // returning a Observable that emits items of list ("it" is the list here)
}.observeOn(Schedulers.io()).flatMap {
// How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called.
personalizedFood ->
Observable.create<Boolean>{
FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{
override fun onCancelled(p0: DatabaseError?) {
it.onError(p0?.toException())
}
override fun onDataChange(p0: DataSnapshot?) {
if(p0 != null) {
val food = p0.getValue(FBFood::class.java)!!
val repo = LocalFoodRepository()
doAsync {
repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
repo.saveFood(this@LoginActivity, personalizedFood)
it.onNext(true)
}
}else {
it.onNext(false)
}
}
})
}
}.observeOn(Schedulers.io()).doOnCompleted{
dismissProgressDialog()
finish()
}.doOnError{
it.printStackTrace()
dismissProgressDialog()
finish()
}.subscribe()
谢谢。
来自 flatMap
的 Observable
知道 "when to all of the items have been finished" 当它发出的所有 observable 都调用了 onCompleted()
。您的代码中的第二个 flatMap
从不调用 onCompleted()
因为它创建的 none observables 调用 onCompleted()
.
您应该在 onDataChange()
方法中调用 onCompleted()
。由于在 flatMap
中创建的每个 observable 只发出一项,因此可以在 onNext()
方法之后直接调用它:
override fun onDataChange(p0: DataSnapshot?) {
if(p0 != null) {
val food = p0.getValue(FBFood::class.java)!!
val repo = LocalFoodRepository()
doAsync {
repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
repo.saveFood(this@LoginActivity, personalizedFood)
it.onNext(true)
it.onCompleted()
}
} else {
it.onNext(false)
it.onCompleted()
}
}