RxJava:如何让 "generate" 运算符向下游延迟发射无限流 "concatMap"

RxJava: how to make "generate" operator emit infinite stream lazily to downstream "concatMap"

我想知道为什么下面代码段中的 generate 运算符导致 的“已接收 X”行在“获取页面”之间打印。文档表明 downstream 将要求发射器发射并且 concatMap 应该在内部源完成后要求下一个值(我猜)。知道在这种情况下如何使生成不急切吗?

  Observable
    .generate(
      { 0 },
      BiFunction { s: Int, emitter: Emitter<Int> -> emitter.onNext(s); s + 1 }
    )
    .doOnNext { println("received $it") }
    .concatMapSingle { page ->
      Single
        .just(if (page == 3) emptyList() else listOf(page))
        .delay(100, TimeUnit.MILLISECONDS)
    }
    .doOnNext { println("fetched a page") }
    .takeWhile { it.isNotEmpty() }
    .blockingSubscribe()

这会打印

received 1
received 2
...
received 1000
fetched page
received 1001
received 1002
...
received 3000
fetched page
...

在RxJava2中,The Observable class does not support backpressure,这意味着你无法控制上游算子的发射率。请改用 Flowable,您应该会看到预期的行为。