Flux bufferMillis - 如何测试?
Flux bufferMillis - How to test?
我正在尝试使用虚拟调度程序测试 Flux.bufferMillis
,但它似乎间歇性地失败了。
以下是
的代码
StepVerifier.withVirtualTime(new Supplier() {
Flux get() {
return Flux.interval(1 second).take(5).bufferMillis(1500);
}}
)
.thenAwait(5 seconds)
.expectNext(...)
.verifyComplete();
我间歇性的报错如下:
java.lang.AssertionError: expectation "expectNextMatches" failed (expected: onNext(); actual: onError(java.util.concurrent.RejectedExecutionException: Scheduler unavailable))
我不确定为什么调度程序偶尔不可用。
当您只提供 Supplier<Flux>
和 verify()
时,StepVerifier
和虚拟时间大致以这种方式工作:
StepVerifier
获得一个 "virtual time scheduler" 实例。
- 它将核心的
SchedulerFactory
替换为 return 所有需要默认调度程序的操作员的虚拟时间调度程序。
- 调用供应商创建运营商链进行测试,订阅,然后播放验证场景。
- 最后,它清理并关闭了它使用的虚拟调度程序。
这就是运行并行测试时会出现问题的地方:因为只有1个工厂,并且由于获取VTS的方式,有2个并行虚拟时间测试会导致使用相同的 VirtualTimeScheduler
实例在后台。所以第一个测试将关闭调度程序,第二个测试将是 REJECTED
.
自 gradle-scalatest
runs in parallel by default 以来,这很可能是您问题的根源。
我正在尝试使用虚拟调度程序测试 Flux.bufferMillis
,但它似乎间歇性地失败了。
以下是
StepVerifier.withVirtualTime(new Supplier() {
Flux get() {
return Flux.interval(1 second).take(5).bufferMillis(1500);
}}
)
.thenAwait(5 seconds)
.expectNext(...)
.verifyComplete();
我间歇性的报错如下:
java.lang.AssertionError: expectation "expectNextMatches" failed (expected: onNext(); actual: onError(java.util.concurrent.RejectedExecutionException: Scheduler unavailable))
我不确定为什么调度程序偶尔不可用。
Supplier<Flux>
和 verify()
时,StepVerifier
和虚拟时间大致以这种方式工作:
StepVerifier
获得一个 "virtual time scheduler" 实例。- 它将核心的
SchedulerFactory
替换为 return 所有需要默认调度程序的操作员的虚拟时间调度程序。 - 调用供应商创建运营商链进行测试,订阅,然后播放验证场景。
- 最后,它清理并关闭了它使用的虚拟调度程序。
这就是运行并行测试时会出现问题的地方:因为只有1个工厂,并且由于获取VTS的方式,有2个并行虚拟时间测试会导致使用相同的 VirtualTimeScheduler
实例在后台。所以第一个测试将关闭调度程序,第二个测试将是 REJECTED
.
自 gradle-scalatest
runs in parallel by default 以来,这很可能是您问题的根源。