多个 Rxjava retryWhen 用于处理不同的错误
Multiple Rxjava retryWhen for handling different error
在我的项目中,我有一个方法可以以相同的方式订阅我的 Observable。我正在尝试通过在其上添加 retryWhen 选项来增强它。
为了避免在处理不同的错误时进行大量重试,我设计了这个逻辑
通用的 RetryFunction class
abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {
override fun apply(t: Observable<Throwable>): Observable<*> {
return t.flatMap {
if (shouldRetry(it)) Observable.just(it)
else Observable.empty()
}.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
if (attempts == nbOfAttempts) {
throw RetryMaxAttemptsException(nbOfAttempts)
} else {
Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
attempts
}
}).flatMap { onRetry(it) }
}
abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean
}
两次 child class 每次在错误发生后进行不同的重试
class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return true
} }
class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return false
}}
- 本例简化了shouldRetry方法
所有这些重试函数都在重试函数列表中找到它们的方式,该列表使用 ObservableTransformer 通过每个函数的 retryWhen 设置为 observable
class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {
val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),
RxStream500Retry("Test2")
)
override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
retryFunctionList.forEach {
upstream.retryWhen(it)
}
return upstream
}}
我的订阅链是这样的:
streamCache[stremId] = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { listener.onLoading() }
.compose(RetryComposer())
.doOnComplete {
Log.d(" Retry onComplete")
streamCache.remove(stremId) }
.subscribe(
{ result -> listener.onSuccess(result) },
{ throwable ->
streamCache.remove(stremId)
}
)
当我测试一个出现错误的可观察对象时,没有任何反应发生,我的 RxStream404Retry 没有被触发。每个可观察的重试次数不能超过一次吗?
非常感谢
我认为问题来自:
retryFunctionList.forEach {
upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber
}
此代码等同于:
Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
return upstream
所以,这些可观察量没有被主 Rx 链的订阅者订阅。
您可能已经查看了 amb()
运算符 (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-)
您可以尝试类似的方法:
return upstream.retryWhen(amb(retryFunctionList)) // pseudo code
大概就是这样。
在我的项目中,我有一个方法可以以相同的方式订阅我的 Observable。我正在尝试通过在其上添加 retryWhen 选项来增强它。
为了避免在处理不同的错误时进行大量重试,我设计了这个逻辑
通用的 RetryFunction class
abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {
override fun apply(t: Observable<Throwable>): Observable<*> {
return t.flatMap {
if (shouldRetry(it)) Observable.just(it)
else Observable.empty()
}.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
if (attempts == nbOfAttempts) {
throw RetryMaxAttemptsException(nbOfAttempts)
} else {
Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
attempts
}
}).flatMap { onRetry(it) }
}
abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean
}
两次 child class 每次在错误发生后进行不同的重试
class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return true
} }
class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return false
}}
- 本例简化了shouldRetry方法
所有这些重试函数都在重试函数列表中找到它们的方式,该列表使用 ObservableTransformer 通过每个函数的 retryWhen 设置为 observable
class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {
val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),
RxStream500Retry("Test2")
)
override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
retryFunctionList.forEach {
upstream.retryWhen(it)
}
return upstream
}}
我的订阅链是这样的:
streamCache[stremId] = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { listener.onLoading() }
.compose(RetryComposer())
.doOnComplete {
Log.d(" Retry onComplete")
streamCache.remove(stremId) }
.subscribe(
{ result -> listener.onSuccess(result) },
{ throwable ->
streamCache.remove(stremId)
}
)
当我测试一个出现错误的可观察对象时,没有任何反应发生,我的 RxStream404Retry 没有被触发。每个可观察的重试次数不能超过一次吗?
非常感谢
我认为问题来自:
retryFunctionList.forEach {
upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber
}
此代码等同于:
Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
return upstream
所以,这些可观察量没有被主 Rx 链的订阅者订阅。
您可能已经查看了 amb()
运算符 (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-)
您可以尝试类似的方法:
return upstream.retryWhen(amb(retryFunctionList)) // pseudo code
大概就是这样。