多个 Rxjava retryWhen 用于处理不同的错误

Multiple Rxjava retryWhen for handling different error

在我的项目中,我有一个方法可以以相同的方式订阅我的 Observable。我正在尝试通过在其上添加 retryWhen 选项来增强它。

为了避免在处理不同的错误时进行大量重试,我设计了这个逻辑

通用的 RetryFunction class

abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {

override fun apply(t: Observable<Throwable>): Observable<*> {
    return t.flatMap {
        if (shouldRetry(it)) Observable.just(it)
        else Observable.empty()
    }.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
        if (attempts == nbOfAttempts) {
            throw RetryMaxAttemptsException(nbOfAttempts)
        } else {
            Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
            attempts
        }
    }).flatMap { onRetry(it) }

}

abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean

}

两次 child class 每次在错误发生后进行不同的重试

class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
    return Observable.timer(500, TimeUnit.MILLISECONDS)
}

override fun shouldRetry(throwable: Throwable): Boolean {
    return true
}  } 

class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
    return Observable.timer(500, TimeUnit.MILLISECONDS)
}

override fun shouldRetry(throwable: Throwable): Boolean {
    return false
}}

所有这些重试函数都在重试函数列表中找到它们的方式,该列表使用 ObservableTransformer 通过每个函数的 retryWhen 设置为 observable

class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {

val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),
    RxStream500Retry("Test2")
)


override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
    retryFunctionList.forEach {
        upstream.retryWhen(it)
    }

    return upstream
}}

我的订阅链是这样的:

  streamCache[stremId] =   observable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe { listener.onLoading() }
        .compose(RetryComposer())            
        .doOnComplete {
            Log.d(" Retry onComplete")
            streamCache.remove(stremId) }
        .subscribe(
            { result -> listener.onSuccess(result) },
            { throwable ->
                streamCache.remove(stremId)
            }
        )

当我测试一个出现错误的可观察对象时,没有任何反应发生,我的 RxStream404Retry 没有被触发。每个可观察的重试次数不能超过一次吗?

非常感谢

我认为问题来自:

retryFunctionList.forEach {
    upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber
}

此代码等同于:

Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
return upstream

所以,这些可观察量没有被主 Rx 链的订阅者订阅。 您可能已经查看了 amb() 运算符 (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-)

您可以尝试类似的方法:

return upstream.retryWhen(amb(retryFunctionList)) // pseudo code

大概就是这样。