如何处理 RxJava 中多个链式 Observable 的错误?
How to handle error on multiple chained Observable in RxJava?
我正在使用 Kotlin、RxJava 和 Retrofit 开发 Android 应用程序。
我想向服务器发送 Http 请求。
- PUT - 作业的更新选项
- POST - 运行 工作
第一次请求成功后,我发送第二次请求。
所以我用了concatMap。
val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST
updateJob.concatMap { runJob }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success: $job")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
我想要的是取消,如果第一个请求失败。
我应该怎么做?
这是一个可能的代码。
但是……这段代码是……我觉得这很难看……
请问有什么很酷的代码吗?
disposable.add(
restService.updateJob(token, job.id, options) // PUT
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "updateJob - success")
restService.runJob(token, job.id) // POST
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
}, {
Log.e(TAG, "updateJob - failed: ${it.message}")
it.printStackTrace()
})
)
我还有一个问题。
我有工作清单。
我也想做同样的事情。
即使一些工作失败了,我也想继续下一个工作。
我考虑过"onErrorResumeNext, onErrorReturn, doOnError"。
但它们不是解决方案。
我该怎么做?
Observable.fromIterable(jobs)
.concatMap { job ->
val updateJob = restService.updateJob(token, job.id, options)
val printJob = restService.printJob(token, job.id)
updateJob.concatMap { printJob }
}
.window(1) // I thought "window" can be the solution. But it doesn't work.
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJobs - success")
}, {
Log.e(TAG, "runJobs - failed: ${it.message}")
it.printStackTrace()
})
其实你已经给出了答案。
你的第一个案例是正确的
val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST
updateJob.concatMap { runJob }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success: $job")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
所以在这种情况下,如果 updateJob
请求失败,则流将移至 error stream
并且永远不会调用 runJob
请求。
runJob
只有在updateJob
成功时才会调用
并且在 updateJob
成功之后如果 runJob
失败,那么也会调用 error stream
。
不需要你的第二个解决方案。
对于你的第二个问题,onErrorResumeNext
应该可以。 Return 任何虚拟值并在 onNext
中处理它
Observable.fromIterable(jobs)
.concatMap { job -> restService.updateJob(token, job.id, options).onErrorResumeNext(Flowable.just(/*Send any dummy value*/)) }
.contcatMap { job -> restService.printJob(token, job.id).onErrorResumeNext{Flowable.just(/*Send any dummy value*/)} }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
/*If dummy value received....handle it gracefully*/
Log.d(TAG, "runJobs - success")
}, {
Log.e(TAG, "runJobs - failed: ${it.message}")
it.printStackTrace()
})
我正在使用 Kotlin、RxJava 和 Retrofit 开发 Android 应用程序。 我想向服务器发送 Http 请求。
- PUT - 作业的更新选项
- POST - 运行 工作
第一次请求成功后,我发送第二次请求。 所以我用了concatMap。
val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST
updateJob.concatMap { runJob }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success: $job")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
我想要的是取消,如果第一个请求失败。 我应该怎么做?
这是一个可能的代码。 但是……这段代码是……我觉得这很难看…… 请问有什么很酷的代码吗?
disposable.add(
restService.updateJob(token, job.id, options) // PUT
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "updateJob - success")
restService.runJob(token, job.id) // POST
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
}, {
Log.e(TAG, "updateJob - failed: ${it.message}")
it.printStackTrace()
})
)
我还有一个问题。 我有工作清单。 我也想做同样的事情。 即使一些工作失败了,我也想继续下一个工作。 我考虑过"onErrorResumeNext, onErrorReturn, doOnError"。 但它们不是解决方案。 我该怎么做?
Observable.fromIterable(jobs)
.concatMap { job ->
val updateJob = restService.updateJob(token, job.id, options)
val printJob = restService.printJob(token, job.id)
updateJob.concatMap { printJob }
}
.window(1) // I thought "window" can be the solution. But it doesn't work.
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJobs - success")
}, {
Log.e(TAG, "runJobs - failed: ${it.message}")
it.printStackTrace()
})
其实你已经给出了答案。 你的第一个案例是正确的
val updateJob = restService.updateJob(token, job.id, options) // PUT
val runJob = restService.runJob(token, job.id) // POST
updateJob.concatMap { runJob }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
Log.d(TAG, "runJob - success: $job")
}, {
Log.e(TAG, "runJob - failed: ${it.message}")
it.printStackTrace()
})
所以在这种情况下,如果 updateJob
请求失败,则流将移至 error stream
并且永远不会调用 runJob
请求。
runJob
只有在updateJob
成功时才会调用
并且在 updateJob
成功之后如果 runJob
失败,那么也会调用 error stream
。
不需要你的第二个解决方案。
对于你的第二个问题,onErrorResumeNext
应该可以。 Return 任何虚拟值并在 onNext
Observable.fromIterable(jobs)
.concatMap { job -> restService.updateJob(token, job.id, options).onErrorResumeNext(Flowable.just(/*Send any dummy value*/)) }
.contcatMap { job -> restService.printJob(token, job.id).onErrorResumeNext{Flowable.just(/*Send any dummy value*/)} }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ job ->
/*If dummy value received....handle it gracefully*/
Log.d(TAG, "runJobs - success")
}, {
Log.e(TAG, "runJobs - failed: ${it.message}")
it.printStackTrace()
})