使用 Schedulers.single() 在 Reactor StepVerifier 中重新抛出 AssertionErrors

Rethrow AssertionErrors in Reactor StepVerifier using Schedulers.single()

我们正在使用 Project Reactor to run a particular operation asynchronously as per the code in ServiceTest below. To test this code, as per ServiceTest below, when setting up the Mono for the async operation we make the Mono pass it's result to a DirectProcessor with doOnNext that the test has access to, and then carry out our test call and assertions with StepVerifier

StepVerifier#assertNext 的 JavaDoc 读取

Any AssertionErrors thrown by the consumer will be rethrown during verification.

我们发现 仅当使用即时调度程序 (Schedulers.immediate()) is used and is not true when the single scheduler (Schedulers.single()) 时 是正确的。当使用单个调度程序时,不会重新抛出 AssertionErrors,即测试总是通过。

是否有可能,如果可以,如何使用单个调度程序并根据 JavaDoc 在验证期间重新抛出 AssertionErrors?

@Service
@RequiredArgsConstructor
public class Service implements WithReactive, WithTestProcessor<Response> {

    @Getter
    @Setter
    private DirectProcessor<Response> processor = DirectProcessor.create();

    @Setter
    private Scheduler scheduler = Schedulers.single();

    public void doAction() {
        Mono.fromSupplier(this::doActionAsync)
            .doOnNext(processor::onNext)
            .subscribeOn(scheduler)
            .subscribe();
    }

    private Response doActionAsync() {
        ...
    }

    ...

}
public interface WithReactive {
    void setScheduler(Scheduler scheduler);
}
public interface WithTestProcessor<T> {
    void setProcessor(DirectProcessor<T> processor);
    DirectProcessor<T> getProcessor();
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ServiceTest {

    @Inject
    private Collection<WithTestProcessor> withTestProcessors;

    @Before
    public void setTestProcessors() {
        withTestProcessors.forEach(withTestProcessor -> withTestProcessor.setProcessor(DirectProcessor.create()));
    }

    @Inject
    private Collection<WithReactive> withReactives;

    @Before
    public void makeReactiveSynchronous() {
        withReactives.forEach(withReactive -> withReactive.setScheduler(Schedulers.immediate()));
    }

    @Test
    private void test() {
      StepVerifier.create(service.getProcessor())
          .then(service::doAction)
          .assertNext(response -> assertThat(logExtractor.getInsertsByTable("assets")).hasSize(1))
          .thenCancel()
          .verify();
    }
}

这是三个因素的组合:初始 then、由于 subscribeOnthenCancel 而导致的订阅与验证同时发生的事实。

一种解决方法是在 StepVerifier 执行 thenCancel 之前为 onNext 提供足够的时间,方法是在 thenCancel.[=17 之前放置 thenAwait(Duration.ofMillis(10)) =]