使用 `onBackpressureLatest` 丢弃阻塞 Flowable 中的中间消息

Using `onBackpressureLatest` to drop intermediate messages in blocking Flowable

我有一个链,我在其中进行一些阻塞 IO 调用(例如 HTTP 调用)。我希望阻塞调用消耗一个值,不中断地继续,但同时丢弃所有堆积的东西,然后以相同的方式消耗下一个值。

考虑以下示例:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}

从天真的角度来看,我希望打印出类似 0, 10, 20, ... 的内容,但它打印出 0, 1, 2, ....

我做错了什么?

编辑:

我想天真地添加 debounce 来吃掉传入的流:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

但是,现在我得到了 java.lang.InterruptedException: sleep interrupted

编辑:

以下似乎有效:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

输出符合预期0, 10, 20, ...!!

这是正确的方法吗?

我注意到 throttleLast 将切换到 Computation-Scheduler。有没有办法回到原来的调度器?

编辑:

我偶尔也会 java.lang.InterruptedException: sleep interrupted 使用该变体。

解决问题最简单的方法是:

fun <T> Flowable<T>.lossy() : Flowable<T> {
  return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}

通过在 Flowable 上调用 lossy,它开始丢弃所有传入速度快于下游消费者处理速度的元素。