如何在重试之前在 RxJava2 (RxJava3) 链中使超时运算符触发一次异常
How to make timeout operator fire exception one time in RxJava2 (RxJava3) chain before retry
我正在尝试扩展我的自定义 FlowableTransformer
的功能,我用它有条件地从一个 Flowable
切换到另一个,由提供的 Function<T, Flowable<R>>
返回,即在我的 [=15] =] 如果初始项目符合某些条件,我将项目发送到服务器并切换到可流动的服务器响应:
/**
* Switches from upstreamFlowable to anotherFlowable
* using switchFunction if item from upstreamFlowable
* matches test condition
*
* usage:
* upstreamFlowable
* .compose(RxSwitchFlowablesOnCondition(condition, switchFunction))
**/
open class
RxSwitchFlowablesOnCondition<T, R>(private val condition : Predicate<T>,
private val switchFunction : Function<T, Flowable<R>>) : FlowableTransformer<T, R>
{
override fun
apply(upstreamFlowable : Flowable<T>) : Publisher<R>
{
return upstreamFlowable
.compose(RxFlowableOnIo<T>())
.filter {emittedItem : T ->
condition.test(emittedItem)
}
.switchMap {emittedItem : T ->
val anotherFlowable = switchFunction.apply(emittedItem)
anotherFlowable
//.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS) // #1
}
//.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS) // #2
}
}
要在发生错误时重新启动链,在我的 android 应用程序中,我使用 retry
运算符:
upstreamFlowable
.compose(RxSwitchFlowablesOnCondition(condition, switchFunction))
.retry() // to resubscribe on error
.subscribe(...)
我想做的是添加timeout
逻辑,例如,如果upstreamFlowable
发出一个项目和在anotherFlowable
获得响应之间的时间大于CONNECTION_TIMEOUT_SECONDS
,我想触发错误并重新订阅 upstreamFlowable
,这样我就可以等待下一个 emittedItem
。
我有:
- 如果我取消对行
#1
的注释,则超时会产生一个 TimeoutException
,重试重新订阅链,我会继续每隔 CONNECTION_TIMEOUT_SECONDS
时间段收到超时异常,即使没有下一个发出的项目. 如何跳过它们?
- 如果我取消注释行
#2
,那么我在 CONNECTION_TIMEOUT_SECONDS
订阅后 TimeoutException
秒后开始接收,而不是自项目发布后。 如何解决?
需要你的帮助。提前致谢!
要获得您想要的行为,您只需要在 switchMap
块中进行小的更改。所以,我将你的 apply
实现修改成了这个
override fun
apply(upstreamFlowable : Flowable<T>) : Publisher<R>
{
return upstreamFlowable
.compose(RxFlowableOnIo<T>())
.filter {emittedItem : T ->
condition.test(emittedItem)
}
.switchMap {
Flowable.just(it)
.switchMap { emittedItem : T ->
val anotherFlowable = switchFunction(emittedItem)
anotherFlowable
}
.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS)
}
}
通过这样做,timeout
将在发出值时开始计数。
更新
为了进行测试,我准备了简单的代码并添加了日志:
- upstream 将在 2 秒后发射一次
- anotherFlowable 将在 1 秒后发射一次
- 超时为 500 毫秒
// 示例用法
val upstream = Flowable.timer(2, TimeUnit.SECONDS)
upstream
.doOnSubscribe { println("subscribed ${System.currentTimeMillis()}") }
.compose(
RxSwitchFlowablesOnCondition<Long, Long>(
Predicate { true },
{ Flowable.timer(1, TimeUnit.SECONDS)})
)
.doOnError { println("timeout ${System.currentTimeMillis()}") }
.retry()
.subscribe()
// RxSwitchFlowablesOnCondition
open class
RxSwitchFlowablesOnCondition<T, R>(private val condition : Predicate<T>,
private val switchFunction : (T) -> Flowable<R>) : FlowableTransformer<T, R>
{
override fun
apply(upstreamFlowable : Flowable<T>) : Publisher<R>
{
return upstreamFlowable
.observeOn(Schedulers.io())
.filter {emittedItem : T ->
condition.test(emittedItem)
}
.switchMap {
Flowable.just(it)
.doOnNext { println("onNext ${System.currentTimeMillis()}") }
.switchMap { emittedItem : T ->
val anotherFlowable = switchFunction(emittedItem)
anotherFlowable
}
.timeout(500, TimeUnit.MILLISECONDS)
}
}
}
这是示例输出日志
subscribed 1569286225769
onNext 1569286227790
timeout 1569286228292 // around 500 ms after onNext
subscribed 1569286228295
onNext 1569286230297
timeout 1569286230800
subscribed 1569286230800
onNext 1569286232803
timeout 1569286233306
subscribed 1569286233306
是否符合您的预期?
我正在尝试扩展我的自定义 FlowableTransformer
的功能,我用它有条件地从一个 Flowable
切换到另一个,由提供的 Function<T, Flowable<R>>
返回,即在我的 [=15] =] 如果初始项目符合某些条件,我将项目发送到服务器并切换到可流动的服务器响应:
/**
* Switches from upstreamFlowable to anotherFlowable
* using switchFunction if item from upstreamFlowable
* matches test condition
*
* usage:
* upstreamFlowable
* .compose(RxSwitchFlowablesOnCondition(condition, switchFunction))
**/
open class
RxSwitchFlowablesOnCondition<T, R>(private val condition : Predicate<T>,
private val switchFunction : Function<T, Flowable<R>>) : FlowableTransformer<T, R>
{
override fun
apply(upstreamFlowable : Flowable<T>) : Publisher<R>
{
return upstreamFlowable
.compose(RxFlowableOnIo<T>())
.filter {emittedItem : T ->
condition.test(emittedItem)
}
.switchMap {emittedItem : T ->
val anotherFlowable = switchFunction.apply(emittedItem)
anotherFlowable
//.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS) // #1
}
//.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS) // #2
}
}
要在发生错误时重新启动链,在我的 android 应用程序中,我使用 retry
运算符:
upstreamFlowable
.compose(RxSwitchFlowablesOnCondition(condition, switchFunction))
.retry() // to resubscribe on error
.subscribe(...)
我想做的是添加timeout
逻辑,例如,如果upstreamFlowable
发出一个项目和在anotherFlowable
获得响应之间的时间大于CONNECTION_TIMEOUT_SECONDS
,我想触发错误并重新订阅 upstreamFlowable
,这样我就可以等待下一个 emittedItem
。
我有:
- 如果我取消对行
#1
的注释,则超时会产生一个TimeoutException
,重试重新订阅链,我会继续每隔CONNECTION_TIMEOUT_SECONDS
时间段收到超时异常,即使没有下一个发出的项目. 如何跳过它们? - 如果我取消注释行
#2
,那么我在CONNECTION_TIMEOUT_SECONDS
订阅后TimeoutException
秒后开始接收,而不是自项目发布后。 如何解决?
需要你的帮助。提前致谢!
要获得您想要的行为,您只需要在 switchMap
块中进行小的更改。所以,我将你的 apply
实现修改成了这个
override fun
apply(upstreamFlowable : Flowable<T>) : Publisher<R>
{
return upstreamFlowable
.compose(RxFlowableOnIo<T>())
.filter {emittedItem : T ->
condition.test(emittedItem)
}
.switchMap {
Flowable.just(it)
.switchMap { emittedItem : T ->
val anotherFlowable = switchFunction(emittedItem)
anotherFlowable
}
.timeout(CONNECTION_TIMEOUT_SECONDS, SECONDS)
}
}
通过这样做,timeout
将在发出值时开始计数。
更新 为了进行测试,我准备了简单的代码并添加了日志:
- upstream 将在 2 秒后发射一次
- anotherFlowable 将在 1 秒后发射一次
- 超时为 500 毫秒
// 示例用法
val upstream = Flowable.timer(2, TimeUnit.SECONDS)
upstream
.doOnSubscribe { println("subscribed ${System.currentTimeMillis()}") }
.compose(
RxSwitchFlowablesOnCondition<Long, Long>(
Predicate { true },
{ Flowable.timer(1, TimeUnit.SECONDS)})
)
.doOnError { println("timeout ${System.currentTimeMillis()}") }
.retry()
.subscribe()
// RxSwitchFlowablesOnCondition
open class
RxSwitchFlowablesOnCondition<T, R>(private val condition : Predicate<T>,
private val switchFunction : (T) -> Flowable<R>) : FlowableTransformer<T, R>
{
override fun
apply(upstreamFlowable : Flowable<T>) : Publisher<R>
{
return upstreamFlowable
.observeOn(Schedulers.io())
.filter {emittedItem : T ->
condition.test(emittedItem)
}
.switchMap {
Flowable.just(it)
.doOnNext { println("onNext ${System.currentTimeMillis()}") }
.switchMap { emittedItem : T ->
val anotherFlowable = switchFunction(emittedItem)
anotherFlowable
}
.timeout(500, TimeUnit.MILLISECONDS)
}
}
}
这是示例输出日志
subscribed 1569286225769
onNext 1569286227790
timeout 1569286228292 // around 500 ms after onNext
subscribed 1569286228295
onNext 1569286230297
timeout 1569286230800
subscribed 1569286230800
onNext 1569286232803
timeout 1569286233306
subscribed 1569286233306
是否符合您的预期?