集成测试反应性 Spring 云流

Integration Test a Reactive Spring Cloud Stream

TLDR;如何使用 Test Binder 测试 Reactive Function 组合?

我有一个使用 Reactive Functions 的 Spring Cloud Stream,但我不知道如何测试它。我没有看到任何关于如何从输入源到输出目标活页夹进行集成测试的官方文档。

在我的具体案例中,我正在使用 Reactive Supplier 和 IntegrationReactiveUtils.messageChannelToFlux() 模式连接 Spring 集成流程。这在开发环境中有效——我可以使用 Spring 集成流程从 RabbitMQ 中提取消息,然后它们进入 SCSt。

我的 SCSt 有几个链接在一起的函数,每个函数都是反应性的。它们的组成类似于 func1|func2|func3。我验证了这适用于 dev Rabbit(源)和 Kafka(目标)。

我似乎无法弄清楚如何测试它,而且似乎没有关于测试完整反应流的任何官方文档。现在我的代码大致如下所示:

@Autowired
MessageChannel inputChannel;

@Autowired
private OutputDestination output;

@Test
void myTest() {
    //omitted prep of var 'messageToSend'
    this.inputChannel.send(messageToSend);
    var outputMessage = output.receive(5000);
    Assertions.assertNotNull(outputMessage.getPayload());
}

我收到的错误是 output.receive(5000) returns null。我怀疑是线程问题,因为我没有订阅 Flux 并等待完成。

我在 Flux 函数中有一个 运行 调试器,看到消息一直到最后,没有任何错误或异常。

我真的想通了。我必须指定活页夹名称。我有一个测试 属性 spring.cloud.stream.bindings.processingStream 集,我认为它产生了 2 个新绑定(processingStream-in-0processingStream-out-0)。

原来我不得不在测试代码中设置绑定名称,如output.receive(5000, "processingStream"),没有-out-0后缀。我现在可以从流中接收消息了。