使用 `onBackpressureLatest` 丢弃阻塞 Flowable 中的中间消息
Using `onBackpressureLatest` to drop intermediate messages in blocking Flowable
我有一个链,我在其中进行一些阻塞 IO 调用(例如 HTTP 调用)。我希望阻塞调用消耗一个值,不中断地继续,但同时丢弃所有堆积的东西,然后以相同的方式消耗下一个值。
考虑以下示例:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
从天真的角度来看,我希望打印出类似 0, 10, 20, ...
的内容,但它打印出 0, 1, 2, ...
.
我做错了什么?
编辑:
我想天真地添加 debounce
来吃掉传入的流:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
但是,现在我得到了 java.lang.InterruptedException: sleep interrupted
。
编辑:
以下似乎有效:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
输出符合预期0, 10, 20, ...
!!
这是正确的方法吗?
我注意到 throttleLast
将切换到 Computation-Scheduler。有没有办法回到原来的调度器?
编辑:
我偶尔也会 java.lang.InterruptedException: sleep interrupted
使用该变体。
解决问题最简单的方法是:
fun <T> Flowable<T>.lossy() : Flowable<T> {
return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}
通过在 Flowable
上调用 lossy
,它开始丢弃所有传入速度快于下游消费者处理速度的元素。
我有一个链,我在其中进行一些阻塞 IO 调用(例如 HTTP 调用)。我希望阻塞调用消耗一个值,不中断地继续,但同时丢弃所有堆积的东西,然后以相同的方式消耗下一个值。
考虑以下示例:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
从天真的角度来看,我希望打印出类似 0, 10, 20, ...
的内容,但它打印出 0, 1, 2, ...
.
我做错了什么?
编辑:
我想天真地添加 debounce
来吃掉传入的流:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
但是,现在我得到了 java.lang.InterruptedException: sleep interrupted
。
编辑:
以下似乎有效:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
输出符合预期0, 10, 20, ...
!!
这是正确的方法吗?
我注意到 throttleLast
将切换到 Computation-Scheduler。有没有办法回到原来的调度器?
编辑:
我偶尔也会 java.lang.InterruptedException: sleep interrupted
使用该变体。
解决问题最简单的方法是:
fun <T> Flowable<T>.lossy() : Flowable<T> {
return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}
通过在 Flowable
上调用 lossy
,它开始丢弃所有传入速度快于下游消费者处理速度的元素。