使用 RxJava2 创建带有生成函数的流
Create a flowable with generate function using RxJava2
我需要创建一个实现了背压的自定义 Flowable。我正在尝试实现某种分页。这意味着当下游请求 5 个项目时,我将为项目 0 - 5 "ask the data source"。然后当下游需要另外 5 个时,我将获得项目 5 - 10 并返回。
到目前为止我发现的最好的方法是使用 Flowable.generate
方法,但我真的不明白为什么没有办法(据我所知)如何获得 requested
下游请求的项目数。我可以使用生成器的 state
属性 来保存最后请求的项目的索引,这样我只需要新请求的项目的数量。我在 BiFunction apply
中得到的 emmiter 实例是 GeneratorSubscription
,它是从 AtomicLong
延伸而来的。因此,将 emmiter 投射到 AtomicLong
可以获得请求的数字。但我知道这不可能是 "recommended" 方式。
另一方面,当您使用 Flowable.create
时,您会得到具有 long requested()
方法的 FlowableEmitter。使用 generate
更适合我的用例,但现在我也很好奇 "correct" 使用 Flowable.generate
.
的方式是什么
也许我对整件事想得太多了,所以请指出正确的方向。 :) 谢谢。
这是实际代码的样子(在 Kotlin 中):
Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
val requested = (emitter as AtomicLong).get().toInt() //this is bull*hit
val end = start + requested
//get items [start to end] -> items
emmiter.onNext(items)
end /*return the new state*/
})
好的,我发现 BiFunction 的 apply
函数被调用了很多次,请求数量 (n) 也是如此。所以没有理由为它设置 getter 。这不是我所希望的,但显然 generate
是这样工作的。 :)
我需要创建一个实现了背压的自定义 Flowable。我正在尝试实现某种分页。这意味着当下游请求 5 个项目时,我将为项目 0 - 5 "ask the data source"。然后当下游需要另外 5 个时,我将获得项目 5 - 10 并返回。
到目前为止我发现的最好的方法是使用 Flowable.generate
方法,但我真的不明白为什么没有办法(据我所知)如何获得 requested
下游请求的项目数。我可以使用生成器的 state
属性 来保存最后请求的项目的索引,这样我只需要新请求的项目的数量。我在 BiFunction apply
中得到的 emmiter 实例是 GeneratorSubscription
,它是从 AtomicLong
延伸而来的。因此,将 emmiter 投射到 AtomicLong
可以获得请求的数字。但我知道这不可能是 "recommended" 方式。
另一方面,当您使用 Flowable.create
时,您会得到具有 long requested()
方法的 FlowableEmitter。使用 generate
更适合我的用例,但现在我也很好奇 "correct" 使用 Flowable.generate
.
也许我对整件事想得太多了,所以请指出正确的方向。 :) 谢谢。
这是实际代码的样子(在 Kotlin 中):
Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
val requested = (emitter as AtomicLong).get().toInt() //this is bull*hit
val end = start + requested
//get items [start to end] -> items
emmiter.onNext(items)
end /*return the new state*/
})
好的,我发现 BiFunction 的 apply
函数被调用了很多次,请求数量 (n) 也是如此。所以没有理由为它设置 getter 。这不是我所希望的,但显然 generate
是这样工作的。 :)