Rxjava 大号迭代然后跳过记录

Rxjava large no. of Iteration then skip the records

我卡在了当前项目的一个流程中。我不能多谈那个项目,这就是为什么我要解释我卡住的部分。

项目中有很多记录,比如说 200 万,我想将它们全部迭代。

我尝试通过多种方式迭代它们,在我的例子中,它跳过了一些记录,如果发生错误,那么我想 return 成功迭代的记录。

我想我之前遇到过这个问题。以下代码将帮助您实现此目的。

private val compositeDisposable = CompositeDisposable()
fun thenAllValuesAreBufferedAndReceived() {
    val observable = Observable.rangeLong(0,10000)
    val testSubscriber = observable
    .toFlowable(BackpressureStrategy.BUFFER)
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.computation())
    .buffer(500)
    //.subscribe(::println)
    .subscribeWith(object :FlowableSubscriber<List<Long>>{
    override fun onComplete() {
    println("Flowable OnComplete")
    }

    override fun onSubscribe(s: Subscription) {
    println("Flowable onSubscribe")
    s.request(20)

    }

    override fun onNext(t: List<Long>?) {
    println("Flowable onNext $t")
    }

    override fun onError(t: Throwable?) {
    println("Flowable onError")
    }

    })
}

覆盖 onDistroy 方法并通过以下代码清除 compositeDisposable。这将使您的应用程序不会崩溃。

override fun onDestroy() {
    super.onDestroy()
    compositeDisposable.clear()
    compositeDisposable.dispose()
}