Flux bufferMillis - 如何测试?

Flux bufferMillis - How to test?

我正在尝试使用虚拟调度程序测试 Flux.bufferMillis,但它似乎间歇性地失败了。 以下是

的代码
StepVerifier.withVirtualTime(new Supplier() {
    Flux get() {
        return Flux.interval(1 second).take(5).bufferMillis(1500);
    }}
)
    .thenAwait(5 seconds)
    .expectNext(...)
    .verifyComplete();

我间歇性的报错如下:

java.lang.AssertionError: expectation "expectNextMatches" failed (expected: onNext(); actual: onError(java.util.concurrent.RejectedExecutionException: Scheduler unavailable))

我不确定为什么调度程序偶尔不可用。

当您只提供 Supplier<Flux>verify() 时,

StepVerifier 和虚拟时间大致以这种方式工作:

  1. StepVerifier 获得一个 "virtual time scheduler" 实例。
  2. 它将核心的 SchedulerFactory 替换为 return 所有需要默认调度程序的操作员的虚拟时间调度程序。
  3. 调用供应商创建运营商链进行测试,订阅,然后播放验证场景。
  4. 最后,它清理并关闭了它使用的虚拟调度程序。

这就是运行并行测试时会出现问题的地方:因为只有1个工厂,并且由于获取VTS的方式,有2个并行虚拟时间测试会导致使用相同的 VirtualTimeScheduler 实例在后台。所以第一个测试将关闭调度程序,第二个测试将是 REJECTED.

gradle-scalatest runs in parallel by default 以来,这很可能是您问题的根源。