Rxjava 大号迭代然后跳过记录
Rxjava large no. of Iteration then skip the records
我卡在了当前项目的一个流程中。我不能多谈那个项目,这就是为什么我要解释我卡住的部分。
项目中有很多记录,比如说 200 万,我想将它们全部迭代。
我尝试通过多种方式迭代它们,在我的例子中,它跳过了一些记录,如果发生错误,那么我想 return 成功迭代的记录。
我想我之前遇到过这个问题。以下代码将帮助您实现此目的。
private val compositeDisposable = CompositeDisposable()
fun thenAllValuesAreBufferedAndReceived() {
val observable = Observable.rangeLong(0,10000)
val testSubscriber = observable
.toFlowable(BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.buffer(500)
//.subscribe(::println)
.subscribeWith(object :FlowableSubscriber<List<Long>>{
override fun onComplete() {
println("Flowable OnComplete")
}
override fun onSubscribe(s: Subscription) {
println("Flowable onSubscribe")
s.request(20)
}
override fun onNext(t: List<Long>?) {
println("Flowable onNext $t")
}
override fun onError(t: Throwable?) {
println("Flowable onError")
}
})
}
覆盖 onDistroy 方法并通过以下代码清除 compositeDisposable。这将使您的应用程序不会崩溃。
override fun onDestroy() {
super.onDestroy()
compositeDisposable.clear()
compositeDisposable.dispose()
}
我卡在了当前项目的一个流程中。我不能多谈那个项目,这就是为什么我要解释我卡住的部分。
项目中有很多记录,比如说 200 万,我想将它们全部迭代。
我尝试通过多种方式迭代它们,在我的例子中,它跳过了一些记录,如果发生错误,那么我想 return 成功迭代的记录。
我想我之前遇到过这个问题。以下代码将帮助您实现此目的。
private val compositeDisposable = CompositeDisposable()
fun thenAllValuesAreBufferedAndReceived() {
val observable = Observable.rangeLong(0,10000)
val testSubscriber = observable
.toFlowable(BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.buffer(500)
//.subscribe(::println)
.subscribeWith(object :FlowableSubscriber<List<Long>>{
override fun onComplete() {
println("Flowable OnComplete")
}
override fun onSubscribe(s: Subscription) {
println("Flowable onSubscribe")
s.request(20)
}
override fun onNext(t: List<Long>?) {
println("Flowable onNext $t")
}
override fun onError(t: Throwable?) {
println("Flowable onError")
}
})
}
覆盖 onDistroy 方法并通过以下代码清除 compositeDisposable。这将使您的应用程序不会崩溃。
override fun onDestroy() {
super.onDestroy()
compositeDisposable.clear()
compositeDisposable.dispose()
}