RxJava 上的运算符 Observable/Flowable 延迟发射 n 项

Operator on RxJava Observable/Flowable to delay emission by n items

我想转换 Flowable 以便它推迟发射项目,直到收集到指定数量的项目,然后以 FIFO 顺序将它们发射到下游,保持恒定的延迟项目计数。一旦上游发出 onComplete 信号,排队的项目应在发出 onComplete 之前刷新到下游:

(在本例中延迟的项目编号为 3)

1 2 3 4 5 6 7 |
      1 2 3 4 5 6 7 |

我没有看到任何现有的运算符执行此操作或可以修改以获得该行为。 Observable.delay 似乎只支持基于时间的延迟,不支持基于计数的延迟。

实现自定义运算符应该很容易,但也许现有运算符有更简单的方法?

您可以发布序列,跳过最后 N 个,然后将最后 N 个追加回来:

Flowable.range(1, 7)
    .flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
    .doOnNext(v -> System.out.println(v))
// -------------------------------------------------------------------
    .publish(f -> 
        f.skipLast(3).mergeWith(f.takeLast(3))
    )
// -------------------------------------------------------------------
    .blockingSubscribe(v -> System.out.println("<-- " + v));