Spring Cloud Stream - 测试共享主题的消费和生成

Spring Cloud Stream - Testing consuming from and producing into a shared topic

我一直在尝试实施以下解决方案:

我的应用程序期望使用来自 all-messages 的消息 A,执行一些业务逻辑然后生成 消息 B 返回 all-messages.

我使用 StreamBridge 而不是 Function<A,B> 的原因是因为我希望制作方使用任意 生成消息的数量,但为了这个示例,我试图将场景简化为一个。

此外,还有一个自定义路由功能以避免无限循环,它会发送传入的消息 进入适当的消费者,incomingdiscarded,如果有一种方法可以有效地丢弃消息,那就太好了。

话虽如此,我无法使用 Spring Cloud Stream 完全正确地实施。

我希望你能帮助我理解我做错了什么以及如何修复当前的 configure/setup 以使解决方案按预期工作,具体来说:

最让人头疼的是绑定,所以我尝试用传入和传出绑定的不同组合编写一个测试,看看是什么,大致如下:

class ScsProblemTests {

    /* ... */

    @ParameterizedTest
    @MethodSource("bindings")
    void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
        givenNoOutgoingMessages();
        whenAnIncomingMessageArrives(incomingBinding);
        thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
    }

    public static Stream<Arguments> bindings() {
        return Stream.of(
            Arguments.of(null, null),
            Arguments.of(null, "outgoing-out-0"),
            Arguments.of(null, "all-messages"),
            Arguments.of("incoming-in-0", null),
            Arguments.of("incoming-in-0", "outgoing-out-0"),
            Arguments.of("incoming-in-0", "all-messages"),
            Arguments.of("all-messages", null),
            Arguments.of("all-messages", "outgoing-out-0"),
            Arguments.of("all-messages", "all-messages")
        );
    }

    /* ... */
}

我运行这组测试有overridesspring配置文件,我根据图表设置目标覆盖,我也用不同的 spring 配置文件,没有覆盖,同样只是为了有一个控制组来比较。 no-overrides 配置文件中只有 2 个测试通过,其余测试失败。

no-overrides 配置文件显然与设计不匹配,但我很想知道覆盖是如何影响结果的,特别是通过的 no-overrides 测试是:

根据我对 Spring Cloud Stream 的理解,即使在这种 no-overrides 情况下,我也希望以下内容能够通过(但不是):

在这一点上,我开始认为我误解了 Spring Cloud Stream 背后的一些概念,但我真的希望 你可以提供一些有用的建议。

为方便起见,我已将我的代码共享到 this 存储库中。

提前致谢。

在我看来,您正在为不同类型的消息重复使用同一个队列,这迫使您引入路由。此外,这两个函数似乎存在于同一运行时space,那么为什么不只是pass-by-reference(从另一个函数调用一个函数),为什么需要引入网络。

我不知道你的用例,因此只能评论我所看到的,但这里似乎存在设计问题。也许如果您真的分享了用例,我们可以帮助您找到合适的解决方案。

好的,这是我的实现中的问题以及解决方法:

  1. 我在路由配置中输入错误,所以没有启用,这是在 spring 云中启用功能路由器的方法:
spring:
  cloud:
    stream:
      function:
        routing:
          enabled: true
  1. 因为我有一个功能路由器,我不再需要配置我的 incoming-in-0 绑定,而是需要为功能路由器配置一个目的地:
spring:
  cloud:
    stream:
      bindings:
        functionRouter-in-0:
          destination: all-messages
        outgoing-out-0:
          destination: all-messages
      source: outgoing
      # ...
  1. 我误解了 bindings/destinations 以及如何使用框架提供的测试助手,特别是 InputDestinationOutputDestination。我不确定应该使用什么参数来发送消息或接收消息。答案是那些组件在那里模拟真正的绑定器(例如 RabbitMQ、Kafka 等),它们不知道 binding(这是来自 spring 云的构造),它们只知道 destination。 所以在我的例子中,翻译成这样:
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class ScsProblemTests {

    @Autowired
    private InputDestination input;

    @Autowired
    private OutputDestination output;

    /* ... */

    @Test
    void consumeFromAndProduceIntoSharedTopic() {
        // prepare message A ... 

        // simulate message "A" arriving into "all-messages"
        input.send(messageA, "all-messages")
        
        // ...
        // application will pick up the message
        // the function router will dispatch the message to the right consumer
        // the consumer does some business logic
        // eventually a message "B" should be produced into "all-messages"
        
        // check if "all-messages" contains message "B"
        // NOTE: "all-messages" will contain both "A" and "B"
        var discard = output.receive(1000, "all-messages"); // message A
        var messageB = output.receive(1000, "all-messages");
        // assertions ...
    }

    /* ... */
}

注意:根据 pseudo-code 中的注释,最终状态由 all-messages 中的 AB 表示,在这种情况下 OutputDestination 只是共享频道的 window,显然也包含我们发送的初始消息。

希望这是有道理的。我清理了代码的工作版本并将其推送到同一存储库中的 fixed 分支,以便您可以看到实际的修复。