RxJava 上的运算符 Observable/Flowable 延迟发射 n 项
Operator on RxJava Observable/Flowable to delay emission by n items
我想转换 Flowable
以便它推迟发射项目,直到收集到指定数量的项目,然后以 FIFO 顺序将它们发射到下游,保持恒定的延迟项目计数。一旦上游发出 onComplete 信号,排队的项目应在发出 onComplete 之前刷新到下游:
(在本例中延迟的项目编号为 3)
1 2 3 4 5 6 7 |
1 2 3 4 5 6 7 |
我没有看到任何现有的运算符执行此操作或可以修改以获得该行为。 Observable.delay
似乎只支持基于时间的延迟,不支持基于计数的延迟。
实现自定义运算符应该很容易,但也许现有运算符有更简单的方法?
您可以发布序列,跳过最后 N 个,然后将最后 N 个追加回来:
Flowable.range(1, 7)
.flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(v))
// -------------------------------------------------------------------
.publish(f ->
f.skipLast(3).mergeWith(f.takeLast(3))
)
// -------------------------------------------------------------------
.blockingSubscribe(v -> System.out.println("<-- " + v));
我想转换 Flowable
以便它推迟发射项目,直到收集到指定数量的项目,然后以 FIFO 顺序将它们发射到下游,保持恒定的延迟项目计数。一旦上游发出 onComplete 信号,排队的项目应在发出 onComplete 之前刷新到下游:
(在本例中延迟的项目编号为 3)
1 2 3 4 5 6 7 |
1 2 3 4 5 6 7 |
我没有看到任何现有的运算符执行此操作或可以修改以获得该行为。 Observable.delay
似乎只支持基于时间的延迟,不支持基于计数的延迟。
实现自定义运算符应该很容易,但也许现有运算符有更简单的方法?
您可以发布序列,跳过最后 N 个,然后将最后 N 个追加回来:
Flowable.range(1, 7)
.flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(v))
// -------------------------------------------------------------------
.publish(f ->
f.skipLast(3).mergeWith(f.takeLast(3))
)
// -------------------------------------------------------------------
.blockingSubscribe(v -> System.out.println("<-- " + v));