project-reactor API 中 Flux::sampleTimeout 方法的目的是什么?

What is the purpose of Flux::sampleTimeout method in the project-reactor API?

Java 文档说明如下:

Emit the last value from this Flux only if there were no new values emitted during the time window provided by a publisher for that particular last value.

然而我发现上面的描述令人困惑。我在 gitter chat 中读到它类似于 RxJava 中的去抖动。有人可以举例说明吗?经过彻底搜索后,我在任何地方都找不到这个。

sampleTimeout 允许您将伴随 Flux X' 关联到源中的每个传入值 x。如果 X' 在源中发出下一个值之前完成,则发出值 x。如果不是,则删除 x。 对后续值应用相同的处理。

将其视为将原始序列拆分为 windows,由每个伴随通量的开始和完成分隔。如果两个 windows 重叠,触发第一个的值将被丢弃。

另一方面,您有 sample(Duration),它只处理一个同伴 Flux。它将序列拆分为连续的 windows,在固定的时间段内,并在特定的 window.

期间丢弃除最后一个元素之外的所有元素

(编辑):关于您的用例

如果我没理解错的话,您似乎有一个要定期安排的不同长度的处理,但您也不想考虑处理时间超过一个 周期的值?

如果是这样,听起来您想 1) 使用 publishOn 将您的处理隔离在自己的线程中,以及 2) 只需要 sample(Duration) 来满足要求的第二部分(分配的延迟任务不变)。

像这样:

List<Long> passed =
      //regular scheduling:
    Flux.interval(Duration.ofMillis(200))
      //this is only to show that processing is indeed started regularly
    .elapsed()
      //this is to isolate the blocking processing
    .publishOn(Schedulers.elastic())
      //blocking processing itself
    .map(tuple -> {
        long l = tuple.getT2();
        int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210;
        System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms");
        try {
            Thread.sleep(sleep);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return l;
    })
      //this is where we say "drop if too long"
    .sample(Duration.ofMillis(200))
      //the rest is to make it finite and print the processed values that passed
    .take(10)
    .collectList()
    .block();

System.out.println(passed);

输出:

205ms later - 0: sleeping for 100ms
201ms later - 1: sleeping for 210ms
200ms later - 2: sleeping for 100ms
199ms later - 3: sleeping for 210ms
201ms later - 4: sleeping for 100ms
200ms later - 5: sleeping for 100ms
201ms later - 6: sleeping for 100ms
196ms later - 7: sleeping for 210ms
204ms later - 8: sleeping for 100ms
198ms later - 9: sleeping for 210ms
201ms later - 10: sleeping for 100ms
196ms later - 11: sleeping for 210ms
200ms later - 12: sleeping for 100ms
202ms later - 13: sleeping for 210ms
202ms later - 14: sleeping for 100ms
200ms later - 15: sleeping for 100ms
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15]

所以阻塞处理大约每200ms触发一次,只保留200ms内处理的值。