Spring WebFlux 和 Mockito:如何验证流是否被消费以及参与的处理程序是否被调用

Spring WebFlux & Mockito: How to verify the stream was consumed and participating handlers were invoked

我正在试验 Spring on Reactive stack。我对 Java 中的反应流比较陌生。我正在努力思考如何在反应流中测试和验证对象协作。我正在使用 spring-testreactor-test 库进行 TDD 测试。

这是有缺陷的测试

@Test
void givenWeatherService_whenGetWeather_thenPublishWeatherAsync() throws Exception {
    
    Mono<Sample> mono = service.updateWeatherFeed();
    
    RecordedRequest request = server.takeRequest();
    assertThat(request.getMethod()).isEqualTo("GET");
    assertThat(request.getPath()).isEqualTo("/api/weather");
    
    StepVerifier.create(mono)
        .assertNext(sample -> {
            verify(publisher, times(1)).publish(sample);
            verify(sensorRepository, times(1)).insertSample(sample);
        }).verifyComplete();
}

这里是服务方法实现

public Mono<Sample> updateWeatherFeed() {

    // Uses WebClient to return the stream
    Mono<Sample> mono = getWeather();
    
    mono.doOnNext(sample -> {
        sensorRepository.insertSample(sample);
        samplePublisher.publish(sample);
    }).subscribe();
    
    return mono;
}

StepVerifier 挂起并显示以下日志:

10:21:01.827 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x81e9584a, L:/127.0.0.1:62510 - R:localhost/127.0.0.1:62479] Channel cleaned, now 1 active connections and 1 inactive connections

验证流是否已被消费以及参与的处理程序是否已按顺序调用的正确方法是什么?

你的问题可能出在这里:

public Mono<Sample> updateWeatherFeed() {

    // Uses WebClient to return the stream
    Mono<Sample> mono = getWeather();
    
    mono.doOnNext(sample -> {
        sensorRepository.insertSample(sample);
        samplePublisher.publish(sample);
    }).subscribe(); // <--- right here
    
    return mono;
}

您看到您正在订阅并因此使用流。如果您在 api 中查找 Mono#subscribe,您会发现它 returns 是一次性的。这意味着它已被消耗,并准备好处理。

反应流由 producerconsumer 组成。 consumer subscribesproducer 并开始 publishing。发起呼叫的人是 consumer,因此应该是 subscribe

在您的情况下,测试是发起呼叫的测试,因此测试是 consumer 并且应该是 subscribing.

如果我们查看 StepVerifier#create,我们可以看到它需要 Publisher 并且发布者是 non/consumed MonoFlux,而不是Disposable.

那么我们该如何解决这个问题呢,你需要删除 subscribe()

public Mono<Sample> updateWeatherFeed() {
    return getWeather().flatMap(sample -> {
        sensorRepository.insertSample(sample);
        samplePublisher.publish(sample);
        return sample;
    });
}

那么您不需要 StepVerifier,因为您返回的是 Mono,这只是一项。如果你在哪里测试 Flux 你可以使用 StepVerifier 来“单步执行”你的 Flux.

// Set up mocks and such

@Test
void doTest() throws Exception {

    final Service service = new Service(publisherMock, sensorRepositoryMock);
    final Sample sample = service.updateWeatherFeed().block();
    
    // Assert Sample
    assertEquals(sample, ...);

    // Verify mocks
    verify(publisherMock, times(1)).publish(sample);
    verify(sensorRepositoryMock, times(1)).insertSample(sample);
}

我做这个已经有一段时间了,但我是徒手写的,所以我希望你至少知道它的要点。