如何使用RxJava的retryWhen方法多次调用一个方法?
How to call a method multiple times using RxJava's method retryWhen?
我有一个执行网络请求的方法,return 可观察到可变数据列表 class。
有时此方法会因 403 错误而失败。我需要调用 YouTubeClient.getApiKey() 方法来获取新的 Api 密钥并重复请求网络。怎么做?我阅读了很多类似的主题,但没有找到可行的决定。
这段实用方法的代码,当我尝试调用 retryWhen() 方法时
private fun searchRequestWrapper(query: String): Observable<MutableList<Video>> {
return youTubeClient.searchRequest(
YouTubeClient.URL_SNIPPET,
YouTubeClient.MAX_RESULT, query,
YouTubeClient.API_KEY
)
.retryWhen { errors -> errors
.zipWith(Observable.range(1, 3)) { error, a ->
YouTubeClient.getApiKey()
error
}
}
.map {it.items}
}
下面是在自身内部调用实用方法的主要方法
fun fetchVideos(query:String) {
_networkState.set(NetworkState.LOADING)
Log.e("NetworkState", networkState.get()?.status.toString())
try {
compositeDisposable.add(
searchRequestWrapper(query)
.flatMapIterable {it}
.flatMap { video -> videoInfoWrapper(video.videoId).subscribeOn(Schedulers.io()) }
.toList()
.subscribeOn(Schedulers.io())
.subscribe({
Log.e("new videosId",it.toString())
downloadedVideosList.postValue(it)
_networkState.set(NetworkState.LOADED)
Log.e("NetworkState", networkState.get()?.status.toString())
_networkState.set(NetworkState.WAITING)
Log.e("NetworkState", networkState.get()?.status.toString())
},{
errorHandle(it)
}))
}
catch (e: Exception){
Log.e("fetchVideos",e.message)
}
}
这个解决方案怎么样?
其实你在使用retryWhen时出错了。 retryWhen 运算符只是发出是否应该重试的信号。每次内部 observable 发出一个值时,都会开始重试。
在我的示例中,通过 onErrorResumNext 捕获了一个 403 异常,它捕获了 onError 发出并订阅了一个 fallback-observable。此运算符之后的 retryWhen 确保其他错误将重试 3 次。
仅供参考:每个转换日期的运算符只能在不可变数据中工作,因此应避免使用 MutableList。
class So65040953 {
@Test
fun main() {
val client = YouTubeClientInterceptor(YouTubeClientStub())
val scheduler = TestScheduler()
val test = client.searchRequest("fail")
// when 403 -> request key and call search again...
// else propagate error
.onErrorResumeNext {
when (it.message) {
"403" -> client.requestApiKey()
.flatMapObservable { key -> client.searchRequest("good") }
else -> Observable.error(it)
}
}
// when some error other than 403 happens... retry ever second for three times....
// error could be thrown by #searchRequest or #requestApiKey
.retryWhen { throwableObservable ->
throwableObservable.take(3).delay(1L, TimeUnit.SECONDS, scheduler)
}.test()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
assertThat(client.requestApiKey.get()).isEqualTo(1)
assertThat(client.searchRequestCount.get()).isEqualTo(2)
test.assertNoErrors()
.assertValue(listOf(Video("42")))
}
}
internal interface YouTubeClient {
fun searchRequest(query: String): Observable<List<Video>>
fun requestApiKey(): Single<String>
}
internal class YouTubeClientInterceptor(private val client: YouTubeClient) : YouTubeClient {
internal val searchRequestCount = AtomicInteger(0)
internal val requestApiKey = AtomicInteger(0)
override fun searchRequest(query: String): Observable<List<Video>> {
searchRequestCount.incrementAndGet()
return client.searchRequest(query)
}
override fun requestApiKey(): Single<String> {
requestApiKey.incrementAndGet()
return client.requestApiKey()
}
}
internal class YouTubeClientStub : YouTubeClient {
override fun searchRequest(query: String): Observable<List<Video>> {
println("query: $query")
return when (query) {
"fail" -> Observable.error<List<Video>>(RuntimeException("403"))
"good" -> Observable.just(listOf(Video("42")))
else -> Observable.empty()
}
}
override fun requestApiKey(): Single<String> {
println("requestApiKey")
return Single.just("1-1-1-1")
}
}
internal data class Video(private val videoId: String)
我有一个执行网络请求的方法,return 可观察到可变数据列表 class。 有时此方法会因 403 错误而失败。我需要调用 YouTubeClient.getApiKey() 方法来获取新的 Api 密钥并重复请求网络。怎么做?我阅读了很多类似的主题,但没有找到可行的决定。
这段实用方法的代码,当我尝试调用 retryWhen() 方法时
private fun searchRequestWrapper(query: String): Observable<MutableList<Video>> {
return youTubeClient.searchRequest(
YouTubeClient.URL_SNIPPET,
YouTubeClient.MAX_RESULT, query,
YouTubeClient.API_KEY
)
.retryWhen { errors -> errors
.zipWith(Observable.range(1, 3)) { error, a ->
YouTubeClient.getApiKey()
error
}
}
.map {it.items}
}
下面是在自身内部调用实用方法的主要方法
fun fetchVideos(query:String) {
_networkState.set(NetworkState.LOADING)
Log.e("NetworkState", networkState.get()?.status.toString())
try {
compositeDisposable.add(
searchRequestWrapper(query)
.flatMapIterable {it}
.flatMap { video -> videoInfoWrapper(video.videoId).subscribeOn(Schedulers.io()) }
.toList()
.subscribeOn(Schedulers.io())
.subscribe({
Log.e("new videosId",it.toString())
downloadedVideosList.postValue(it)
_networkState.set(NetworkState.LOADED)
Log.e("NetworkState", networkState.get()?.status.toString())
_networkState.set(NetworkState.WAITING)
Log.e("NetworkState", networkState.get()?.status.toString())
},{
errorHandle(it)
}))
}
catch (e: Exception){
Log.e("fetchVideos",e.message)
}
}
这个解决方案怎么样?
其实你在使用retryWhen时出错了。 retryWhen 运算符只是发出是否应该重试的信号。每次内部 observable 发出一个值时,都会开始重试。
在我的示例中,通过 onErrorResumNext 捕获了一个 403 异常,它捕获了 onError 发出并订阅了一个 fallback-observable。此运算符之后的 retryWhen 确保其他错误将重试 3 次。
仅供参考:每个转换日期的运算符只能在不可变数据中工作,因此应避免使用 MutableList。
class So65040953 {
@Test
fun main() {
val client = YouTubeClientInterceptor(YouTubeClientStub())
val scheduler = TestScheduler()
val test = client.searchRequest("fail")
// when 403 -> request key and call search again...
// else propagate error
.onErrorResumeNext {
when (it.message) {
"403" -> client.requestApiKey()
.flatMapObservable { key -> client.searchRequest("good") }
else -> Observable.error(it)
}
}
// when some error other than 403 happens... retry ever second for three times....
// error could be thrown by #searchRequest or #requestApiKey
.retryWhen { throwableObservable ->
throwableObservable.take(3).delay(1L, TimeUnit.SECONDS, scheduler)
}.test()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
assertThat(client.requestApiKey.get()).isEqualTo(1)
assertThat(client.searchRequestCount.get()).isEqualTo(2)
test.assertNoErrors()
.assertValue(listOf(Video("42")))
}
}
internal interface YouTubeClient {
fun searchRequest(query: String): Observable<List<Video>>
fun requestApiKey(): Single<String>
}
internal class YouTubeClientInterceptor(private val client: YouTubeClient) : YouTubeClient {
internal val searchRequestCount = AtomicInteger(0)
internal val requestApiKey = AtomicInteger(0)
override fun searchRequest(query: String): Observable<List<Video>> {
searchRequestCount.incrementAndGet()
return client.searchRequest(query)
}
override fun requestApiKey(): Single<String> {
requestApiKey.incrementAndGet()
return client.requestApiKey()
}
}
internal class YouTubeClientStub : YouTubeClient {
override fun searchRequest(query: String): Observable<List<Video>> {
println("query: $query")
return when (query) {
"fail" -> Observable.error<List<Video>>(RuntimeException("403"))
"good" -> Observable.just(listOf(Video("42")))
else -> Observable.empty()
}
}
override fun requestApiKey(): Single<String> {
println("requestApiKey")
return Single.just("1-1-1-1")
}
}
internal data class Video(private val videoId: String)