使用 StepVerifier 对 Flux.take(Duration duration) 进行单元测试

Unit-testing a Flux.take(Duration duration) with StepVerifier

我正在使用 Spring Reactor Core 3.0.6 并且我有一个方法 returning a Flux:

public Flux<Foo> createFlux(){
    return Flux.<List<Foo>,String>generate(/* generator omitted for clarity's sake */ )
        .take(Duration.ofSeconds(10)
        .flatMap(Flux::fromIterable);
}

生成器函数调用分页 REST api 来获取结果,如果 API 继续 return 数据,我希望 Flux 只是 运行持续 10 秒。

它工作正常,但我想创建一些单元测试,但我在创建测试以验证 Flux 运行 最多仅持续 10 秒时遇到了麻烦。

我嘲笑了休息服务,所以它总是 return 数据并写了这个:

StepVerifier.withVirtualTime(() -> createFlux())
    .thenAwait(Duration.ofSeconds(10))
    .verifyComplete();

但失败了:

java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext([my toString() Foo bean]))

我想我应该以某种方式使用生成的项目,但我无法找到正确的 StepVerifier 方法来这样做。

编辑

我尝试跳过 thenConsumeWhile 的所有项目:

StepVerifier.withVirtualTime(() -> createFlux())
    .thenAwait(Duration.ofSeconds(10))
    .thenConsumeWhile(t -> true)
    .verifyComplete();

但现在测试只是 运行 无限期并且永远不会结束。

也许 reference guide 可以让您走上正确的道路,如果您错过了它?

除了最常见的 expectNext,您必须对序列中的每个项目重复,如果您知道元素的数量,您可以使用 expectNextCountthenConsumeWhile根据谓词跳过元素,

生成器实际上可能非常重要...StepVerifier 受限于无限序列,在使用虚拟时间时更是如此。问题在于主线程中的生成器和 thenAwait 运行,因此生成器是无限的会阻止 stepverifier 提前时间,从而防止序列超时。

既然您想测试 take 的持续时间,我认为虚拟时间不合适(您正在测试模拟时间)。我会让 createFlux 方法可以用拍摄持续时间参数化,并做一个 StepVerifier.create(),持续时间要短得多。

如果你真的想使用某种形式的虚拟时间,我发现让它工作的最低要求是

  1. 通过在测试开始时实例化 Scheduler 非虚拟 线程上隔离生成器循环,然后在 StepVerifier 的 Supplier.
  2. 首先调用 .expectNextCount(1),确保所有内容都已订阅且数据开始流动,然后再尝试提前。

像这样:

public Flux<Integer> createFlux() {
    return Flux.<List<Integer>>generate(sink -> {
        sink.next(Arrays.asList(1, 2, 3));
    })
            .take(Duration.ofSeconds(10))
            .flatMap(Flux::fromIterable);
}

@Test
public void so44657525() throws InterruptedException {
    Scheduler scheduler = Schedulers.newSingle("test");
    AtomicInteger adder = new AtomicInteger();

    StepVerifier.withVirtualTime(() -> createFlux()
            .subscribeOn(scheduler)
            .doOnNext(v -> adder.incrementAndGet())
    )
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(10))
                .thenConsumeWhile(t -> true)
                .verifyComplete();

    System.out.println("Total number of values in generated lists: " + adder.get());
}

expectNextCount(1) 修改为 expectNextCount(100_000),我有一个 运行 打印了 Total number of values in generated lists: 102405 并花费了 40 毫秒。