RxJava Flowable.Interval 单平面图时的背压
RxJava Flowable.Interval backpressure when flatmap with single
我有一个场景,我需要定期调用 API 来检查结果。我正在使用 Flowable.interval
创建一个调用 API.
的区间函数
但是,我遇到了背压问题。在我下面的示例中,在间隔中的每个刻度上创建一个新的单曲。期望的效果是仅在调用尚未进行时调用 API
Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
我可以像这样使用过滤器变量来解决这个问题:
var filter = true
Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
filter
}.flatMap {
System.out.println("Delay $it")
Single.just(1L).doOnSubscribe {
filter = true
}.doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE!!!")
filter = true
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
但这似乎是一个 hacky 解决方案。我已经厌倦了在 interval
函数之后应用 onBackPressureDrop
,但它没有任何效果。
有什么建议吗?
您还必须限制 flatMap
:
Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}
}, false, 1) // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()
我有一个场景,我需要定期调用 API 来检查结果。我正在使用 Flowable.interval
创建一个调用 API.
但是,我遇到了背压问题。在我下面的示例中,在间隔中的每个刻度上创建一个新的单曲。期望的效果是仅在调用尚未进行时调用 API
Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
我可以像这样使用过滤器变量来解决这个问题:
var filter = true
Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
filter
}.flatMap {
System.out.println("Delay $it")
Single.just(1L).doOnSubscribe {
filter = true
}.doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE!!!")
filter = true
}.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()
但这似乎是一个 hacky 解决方案。我已经厌倦了在 interval
函数之后应用 onBackPressureDrop
,但它没有任何效果。
有什么建议吗?
您还必须限制 flatMap
:
Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
System.out.println("Delay $it")
//simulates API call
Single.just(1L).doAfterSuccess {
System.out.println("NEW SINGLE!!!")
}.delay(4, TimeUnit.SECONDS).doAfterSuccess {
System.out.println("SINGLE SUCCESS!!!")
}
}, false, 1) // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()