Reactor TestPublisher 不适用于未完成的通量

Reactor TestPublisher not working with non-completing flux

我认为这是一个关于使用项目反应器 (java) 进行单元测试的非常基本的问题,但我一直在查看文档,但无法弄清楚我的误解是什么。我一直在尝试使用 TestPublisher 和 StepVerifier 来测试应该永无止境的通量,所以我的场景不包括终端信号。

虽然使用 Sinks.many().unicast() 创建的常规接收器按我预期的方式工作,但使用 TestPublisher 却没有:

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

import java.time.Duration;

public class ExploreStepVerifier {

    // this test fails: Did not observe any item or terminal signal within 1000ms in 'source(FluxSource)'
    @Test
    void test_with_test_publisher() {
        TestPublisher<String> publisher = TestPublisher.create();
        publisher.next("hello");
        var result = publisher.flux();
        StepVerifier.create(result)
            .expectNext("hello").as("initial message received")
            .verifyTimeout(Duration.ofSeconds(1));
    }

    // this test succeeds
    @Test
    void test_with_sink() {
        Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
        sink.tryEmitNext("hello");
        var result = sink.asFlux();
        StepVerifier.create(result)
            .expectNext("hello").as("initial message received")
            .verifyTimeout(Duration.ofSeconds(1));
    }
}

我希望我能够用 TestPublisher 代替接收器,如 test_with_test_publisher 所示,但它不起作用。谁能解释一下? 谢谢--

接收器变体“预热”并保留(有限的)数量的信号,如果它们发生在任何订阅者进来之前,而 TestPublisher 则不会(如果没有人在听,该值会被丢弃)除非你使用冷变体Test Publisher.createCold()