RxJava Kotlin 组合第一个元素前的最新超时
RxJava Kotlin combineLatest timeout before first element
我正在使用 Kotlin 开发 android 应用程序。我有 3 个实时数据可观察值。数据来自 Firestore。它们被包裹在 RxJava 的 Observable.combineLatest() 方法中。我想在第一次数据检索时设置超时。
我曾尝试在每个可观察对象上设置超时函数,但它们会在加载所有 3 个可观察对象的初始数据后抛出 TimeoutException。
private fun retrieveAllData() {
Observable.combineLatest(
retrieveDataObservable1().timeout(10, TimeUnit.SECONDS),
retrieveDataObservable2().timeout(10, TimeUnit.SECONDS),
retrieveDataObservable3().timeout(10, TimeUnit.SECONDS),
Function3<String, String, Boolean, Triple<String, String, Boolean>>
{ firstResult, secondResult, ThirdResult ->
Triple(firstResult, secondResult, ThirdResult)
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ Log.d(TAG, "success") },
{ throwable -> Log.d(TAG, "error", throwable) }
)
}
private fun retrieveDataObservable1(): Observable<String> {
return Observable.create<String> { emitter ->
val listener = dataRef1.addSnapshotListener { snapshot, e ->
if (e != null) {
emitter.onError(e)
return@addSnapshotListener
}
emitter.onNext("some value")
}
emitter.setCancellable { listener.remove() }
}
}
如果至少有一个 Observables 在 10 秒内没有发出任何项目,我预计在初始检索时 Observable.combineLatest() 会出现 TimeoutExceiption。
如果所有 observables 都至少成功地发射了一次数据,那么就不应该有 TimeoutException。
它认为这对你有用:
import io.reactivex.Observable
import io.reactivex.functions.Function
import io.reactivex.functions.Function3
import java.util.concurrent.TimeUnit
Observable.combineLatest(
firstObservable,
secondObservable,
thirdObservable,
Function3<String, String, String, String> { first, second, third -> "$first$second$third" })
.timeout<String, String>(
Observable.empty<String>().delay(10, TimeUnit.SECONDS),
Function { Observable.never<String>() }
)
.subscribe {
println("$it")
}
timeout()
运算符的第一个参数是第一个项目超时的 Observable
。在我们的例子中是 10 秒。对于即将到来的项目,这是一个对所有项目 returns Observable.never()
的函数,因此它永远不会触发超时。
我正在使用 Kotlin 开发 android 应用程序。我有 3 个实时数据可观察值。数据来自 Firestore。它们被包裹在 RxJava 的 Observable.combineLatest() 方法中。我想在第一次数据检索时设置超时。
我曾尝试在每个可观察对象上设置超时函数,但它们会在加载所有 3 个可观察对象的初始数据后抛出 TimeoutException。
private fun retrieveAllData() {
Observable.combineLatest(
retrieveDataObservable1().timeout(10, TimeUnit.SECONDS),
retrieveDataObservable2().timeout(10, TimeUnit.SECONDS),
retrieveDataObservable3().timeout(10, TimeUnit.SECONDS),
Function3<String, String, Boolean, Triple<String, String, Boolean>>
{ firstResult, secondResult, ThirdResult ->
Triple(firstResult, secondResult, ThirdResult)
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ Log.d(TAG, "success") },
{ throwable -> Log.d(TAG, "error", throwable) }
)
}
private fun retrieveDataObservable1(): Observable<String> {
return Observable.create<String> { emitter ->
val listener = dataRef1.addSnapshotListener { snapshot, e ->
if (e != null) {
emitter.onError(e)
return@addSnapshotListener
}
emitter.onNext("some value")
}
emitter.setCancellable { listener.remove() }
}
}
如果至少有一个 Observables 在 10 秒内没有发出任何项目,我预计在初始检索时 Observable.combineLatest() 会出现 TimeoutExceiption。 如果所有 observables 都至少成功地发射了一次数据,那么就不应该有 TimeoutException。
它认为这对你有用:
import io.reactivex.Observable
import io.reactivex.functions.Function
import io.reactivex.functions.Function3
import java.util.concurrent.TimeUnit
Observable.combineLatest(
firstObservable,
secondObservable,
thirdObservable,
Function3<String, String, String, String> { first, second, third -> "$first$second$third" })
.timeout<String, String>(
Observable.empty<String>().delay(10, TimeUnit.SECONDS),
Function { Observable.never<String>() }
)
.subscribe {
println("$it")
}
timeout()
运算符的第一个参数是第一个项目超时的 Observable
。在我们的例子中是 10 秒。对于即将到来的项目,这是一个对所有项目 returns Observable.never()
的函数,因此它永远不会触发超时。