project-reactor API 中 Flux::sampleTimeout 方法的目的是什么?
What is the purpose of Flux::sampleTimeout method in the project-reactor API?
Java 文档说明如下:
Emit the last value from this Flux only if there were no new values emitted during the time window provided by a publisher for that particular last value.
然而我发现上面的描述令人困惑。我在 gitter chat 中读到它类似于 RxJava 中的去抖动。有人可以举例说明吗?经过彻底搜索后,我在任何地方都找不到这个。
sampleTimeout
允许您将伴随 Flux
X'
关联到源中的每个传入值 x
。如果 X'
在源中发出下一个值之前完成,则发出值 x
。如果不是,则删除 x
。
对后续值应用相同的处理。
将其视为将原始序列拆分为 windows,由每个伴随通量的开始和完成分隔。如果两个 windows 重叠,触发第一个的值将被丢弃。
另一方面,您有 sample(Duration)
,它只处理一个同伴 Flux。它将序列拆分为连续的 windows,在固定的时间段内,并在特定的 window.
期间丢弃除最后一个元素之外的所有元素
(编辑):关于您的用例
如果我没理解错的话,您似乎有一个要定期安排的不同长度的处理,但您也不想考虑处理时间超过一个 周期的值?
如果是这样,听起来您想 1) 使用 publishOn
将您的处理隔离在自己的线程中,以及 2) 只需要 sample(Duration)
来满足要求的第二部分(分配的延迟任务不变)。
像这样:
List<Long> passed =
//regular scheduling:
Flux.interval(Duration.ofMillis(200))
//this is only to show that processing is indeed started regularly
.elapsed()
//this is to isolate the blocking processing
.publishOn(Schedulers.elastic())
//blocking processing itself
.map(tuple -> {
long l = tuple.getT2();
int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210;
System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms");
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
return l;
})
//this is where we say "drop if too long"
.sample(Duration.ofMillis(200))
//the rest is to make it finite and print the processed values that passed
.take(10)
.collectList()
.block();
System.out.println(passed);
输出:
205ms later - 0: sleeping for 100ms
201ms later - 1: sleeping for 210ms
200ms later - 2: sleeping for 100ms
199ms later - 3: sleeping for 210ms
201ms later - 4: sleeping for 100ms
200ms later - 5: sleeping for 100ms
201ms later - 6: sleeping for 100ms
196ms later - 7: sleeping for 210ms
204ms later - 8: sleeping for 100ms
198ms later - 9: sleeping for 210ms
201ms later - 10: sleeping for 100ms
196ms later - 11: sleeping for 210ms
200ms later - 12: sleeping for 100ms
202ms later - 13: sleeping for 210ms
202ms later - 14: sleeping for 100ms
200ms later - 15: sleeping for 100ms
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15]
所以阻塞处理大约每200ms触发一次,只保留200ms内处理的值。
Java 文档说明如下:
Emit the last value from this Flux only if there were no new values emitted during the time window provided by a publisher for that particular last value.
然而我发现上面的描述令人困惑。我在 gitter chat 中读到它类似于 RxJava 中的去抖动。有人可以举例说明吗?经过彻底搜索后,我在任何地方都找不到这个。
sampleTimeout
允许您将伴随 Flux
X'
关联到源中的每个传入值 x
。如果 X'
在源中发出下一个值之前完成,则发出值 x
。如果不是,则删除 x
。
对后续值应用相同的处理。
将其视为将原始序列拆分为 windows,由每个伴随通量的开始和完成分隔。如果两个 windows 重叠,触发第一个的值将被丢弃。
另一方面,您有 sample(Duration)
,它只处理一个同伴 Flux。它将序列拆分为连续的 windows,在固定的时间段内,并在特定的 window.
(编辑):关于您的用例
如果我没理解错的话,您似乎有一个要定期安排的不同长度的处理,但您也不想考虑处理时间超过一个 周期的值?
如果是这样,听起来您想 1) 使用 publishOn
将您的处理隔离在自己的线程中,以及 2) 只需要 sample(Duration)
来满足要求的第二部分(分配的延迟任务不变)。
像这样:
List<Long> passed =
//regular scheduling:
Flux.interval(Duration.ofMillis(200))
//this is only to show that processing is indeed started regularly
.elapsed()
//this is to isolate the blocking processing
.publishOn(Schedulers.elastic())
//blocking processing itself
.map(tuple -> {
long l = tuple.getT2();
int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210;
System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms");
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
return l;
})
//this is where we say "drop if too long"
.sample(Duration.ofMillis(200))
//the rest is to make it finite and print the processed values that passed
.take(10)
.collectList()
.block();
System.out.println(passed);
输出:
205ms later - 0: sleeping for 100ms
201ms later - 1: sleeping for 210ms
200ms later - 2: sleeping for 100ms
199ms later - 3: sleeping for 210ms
201ms later - 4: sleeping for 100ms
200ms later - 5: sleeping for 100ms
201ms later - 6: sleeping for 100ms
196ms later - 7: sleeping for 210ms
204ms later - 8: sleeping for 100ms
198ms later - 9: sleeping for 210ms
201ms later - 10: sleeping for 100ms
196ms later - 11: sleeping for 210ms
200ms later - 12: sleeping for 100ms
202ms later - 13: sleeping for 210ms
202ms later - 14: sleeping for 100ms
200ms later - 15: sleeping for 100ms
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15]
所以阻塞处理大约每200ms触发一次,只保留200ms内处理的值。