如何处理 RxJava 中多个链式 Observable 的错误?

How to handle error on multiple chained Observable in RxJava?

我正在使用 Kotlin、RxJava 和 Retrofit 开发 Android 应用程序。 我想向服务器发送 Http 请求。

  1. PUT - 作业的更新选项
  2. POST - 运行 工作

第一次请求成功后,我发送第二次请求。 所以我用了concatMap。

val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST

updateJob.concatMap { runJob }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ job ->
        Log.d(TAG, "runJob - success: $job")
    }, {
        Log.e(TAG, "runJob - failed: ${it.message}")
        it.printStackTrace()
    })

我想要的是取消,如果第一个请求失败。 我应该怎么做?

这是一个可能的代码。 但是……这段代码是……我觉得这很难看…… 请问有什么很酷的代码吗?

disposable.add(
            restService.updateJob(token, job.id, options)    // PUT
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ job ->
                    Log.d(TAG, "updateJob - success")
                    restService.runJob(token, job.id)    // POST
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ job ->
                            Log.d(TAG, "runJob - success")
                        }, {
                            Log.e(TAG, "runJob - failed: ${it.message}")
                            it.printStackTrace()
                        })
                }, {
                    Log.e(TAG, "updateJob - failed: ${it.message}")
                    it.printStackTrace()
                })
        )

我还有一个问题。 我有工作清单。 我也想做同样的事情。 即使一些工作失败了,我也想继续下一个工作。 我考虑过"onErrorResumeNext, onErrorReturn, doOnError"。 但它们不是解决方案。 我该怎么做?

Observable.fromIterable(jobs)
            .concatMap { job ->
                val updateJob = restService.updateJob(token, job.id, options)
                val printJob = restService.printJob(token, job.id)

                updateJob.concatMap { printJob }
            }
            .window(1)    // I thought "window" can be the solution. But it doesn't work.
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ job ->
                Log.d(TAG, "runJobs - success")

            }, {
                Log.e(TAG, "runJobs - failed: ${it.message}")
                it.printStackTrace()
            })

其实你已经给出了答案。 你的第一个案例是正确的

val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST

updateJob.concatMap { runJob }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({ job ->
        Log.d(TAG, "runJob - success: $job")
    }, {
        Log.e(TAG, "runJob - failed: ${it.message}")
        it.printStackTrace()
    })

所以在这种情况下,如果 updateJob 请求失败,则流将移至 error stream 并且永远不会调用 runJob 请求。

runJob只有在updateJob成功时才会调用

并且在 updateJob 成功之后如果 runJob 失败,那么也会调用 error stream

不需要你的第二个解决方案。

对于你的第二个问题,onErrorResumeNext 应该可以。 Return 任何虚拟值并在 onNext

中处理它
Observable.fromIterable(jobs)
                    .concatMap { job -> restService.updateJob(token, job.id, options).onErrorResumeNext(Flowable.just(/*Send any dummy value*/)) }
                    .contcatMap { job -> restService.printJob(token, job.id).onErrorResumeNext{Flowable.just(/*Send any dummy value*/)} }
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe({ job ->
                        /*If dummy value received....handle it gracefully*/
                        Log.d(TAG, "runJobs - success")

                    }, {
                        Log.e(TAG, "runJobs - failed: ${it.message}")
                        it.printStackTrace()
                    })