RxJava:如何让 "generate" 运算符向下游延迟发射无限流 "concatMap"
RxJava: how to make "generate" operator emit infinite stream lazily to downstream "concatMap"
我想知道为什么下面代码段中的 generate
运算符导致 吨 的“已接收 X”行在“获取页面”之间打印。文档表明 downstream 将要求发射器发射并且 concatMap
应该在内部源完成后要求下一个值(我猜)。知道在这种情况下如何使生成不急切吗?
Observable
.generate(
{ 0 },
BiFunction { s: Int, emitter: Emitter<Int> -> emitter.onNext(s); s + 1 }
)
.doOnNext { println("received $it") }
.concatMapSingle { page ->
Single
.just(if (page == 3) emptyList() else listOf(page))
.delay(100, TimeUnit.MILLISECONDS)
}
.doOnNext { println("fetched a page") }
.takeWhile { it.isNotEmpty() }
.blockingSubscribe()
这会打印
received 1
received 2
...
received 1000
fetched page
received 1001
received 1002
...
received 3000
fetched page
...
在RxJava2中,The Observable
class does not support backpressure,这意味着你无法控制上游算子的发射率。请改用 Flowable
,您应该会看到预期的行为。
我想知道为什么下面代码段中的 generate
运算符导致 吨 的“已接收 X”行在“获取页面”之间打印。文档表明 downstream 将要求发射器发射并且 concatMap
应该在内部源完成后要求下一个值(我猜)。知道在这种情况下如何使生成不急切吗?
Observable
.generate(
{ 0 },
BiFunction { s: Int, emitter: Emitter<Int> -> emitter.onNext(s); s + 1 }
)
.doOnNext { println("received $it") }
.concatMapSingle { page ->
Single
.just(if (page == 3) emptyList() else listOf(page))
.delay(100, TimeUnit.MILLISECONDS)
}
.doOnNext { println("fetched a page") }
.takeWhile { it.isNotEmpty() }
.blockingSubscribe()
这会打印
received 1
received 2
...
received 1000
fetched page
received 1001
received 1002
...
received 3000
fetched page
...
在RxJava2中,The Observable
class does not support backpressure,这意味着你无法控制上游算子的发射率。请改用 Flowable
,您应该会看到预期的行为。