Spring Cloud Stream - 测试共享主题的消费和生成
Spring Cloud Stream - Testing consuming from and producing into a shared topic
我一直在尝试实施以下解决方案:
我的应用程序期望使用来自 all-messages
的消息 A
,执行一些业务逻辑然后生成
消息 B
返回 all-messages
.
我使用 StreamBridge
而不是 Function<A,B>
的原因是因为我希望制作方使用任意
生成消息的数量,但为了这个示例,我试图将场景简化为一个。
此外,还有一个自定义路由功能以避免无限循环,它会发送传入的消息
进入适当的消费者,incoming
或 discarded
,如果有一种方法可以有效地丢弃消息,那就太好了。
话虽如此,我无法使用 Spring Cloud Stream 完全正确地实施。
我希望你能帮助我理解我做错了什么以及如何修复当前的 configure/setup 以使解决方案按预期工作,具体来说:
- Spring 云流是否支持此解决方案?
- 我的应用程序配置是否正确地实现了上面的解决方案图?
- 当 sending/receiving 消息时,我应该在应用程序中使用哪些绑定?
最让人头疼的是绑定,所以我尝试用传入和传出绑定的不同组合编写一个测试,看看是什么,大致如下:
class ScsProblemTests {
/* ... */
@ParameterizedTest
@MethodSource("bindings")
void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
givenNoOutgoingMessages();
whenAnIncomingMessageArrives(incomingBinding);
thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
}
public static Stream<Arguments> bindings() {
return Stream.of(
Arguments.of(null, null),
Arguments.of(null, "outgoing-out-0"),
Arguments.of(null, "all-messages"),
Arguments.of("incoming-in-0", null),
Arguments.of("incoming-in-0", "outgoing-out-0"),
Arguments.of("incoming-in-0", "all-messages"),
Arguments.of("all-messages", null),
Arguments.of("all-messages", "outgoing-out-0"),
Arguments.of("all-messages", "all-messages")
);
}
/* ... */
}
我运行这组测试有overrides
spring配置文件,我根据图表设置目标覆盖,我也用不同的 spring 配置文件,没有覆盖,同样只是为了有一个控制组来比较。 no-overrides
配置文件中只有 2 个测试通过,其余测试失败。
no-overrides
配置文件显然与设计不匹配,但我很想知道覆盖是如何影响结果的,特别是通过的 no-overrides
测试是:
- incomingBinding=null, outgoingBinding=null
- incomingBinding=null, outgoingBinding=outgoing-out-0
根据我对 Spring Cloud Stream 的理解,即使在这种 no-overrides
情况下,我也希望以下内容能够通过(但不是):
- incomingBinding=incoming-in-0,outgoingBinding=null
- incomingBinding=incoming-in-0, outgoingBinding=outgoing-out-0
在这一点上,我开始认为我误解了 Spring Cloud Stream 背后的一些概念,但我真的希望
你可以提供一些有用的建议。
为方便起见,我已将我的代码共享到 this 存储库中。
提前致谢。
在我看来,您正在为不同类型的消息重复使用同一个队列,这迫使您引入路由。此外,这两个函数似乎存在于同一运行时space,那么为什么不只是pass-by-reference
(从另一个函数调用一个函数),为什么需要引入网络。
我不知道你的用例,因此只能评论我所看到的,但这里似乎存在设计问题。也许如果您真的分享了用例,我们可以帮助您找到合适的解决方案。
好的,这是我的实现中的问题以及解决方法:
- 我在路由配置中输入错误,所以没有启用,这是在 spring 云中启用功能路由器的方法:
spring:
cloud:
stream:
function:
routing:
enabled: true
- 因为我有一个功能路由器,我不再需要配置我的
incoming-in-0
绑定,而是需要为功能路由器配置一个目的地:
spring:
cloud:
stream:
bindings:
functionRouter-in-0:
destination: all-messages
outgoing-out-0:
destination: all-messages
source: outgoing
# ...
- 我误解了 bindings/destinations 以及如何使用框架提供的测试助手,特别是
InputDestination
和 OutputDestination
。我不确定应该使用什么参数来发送消息或接收消息。答案是那些组件在那里模拟真正的绑定器(例如 RabbitMQ、Kafka 等),它们不知道 binding
(这是来自 spring 云的构造),它们只知道 destination
。
所以在我的例子中,翻译成这样:
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class ScsProblemTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
/* ... */
@Test
void consumeFromAndProduceIntoSharedTopic() {
// prepare message A ...
// simulate message "A" arriving into "all-messages"
input.send(messageA, "all-messages")
// ...
// application will pick up the message
// the function router will dispatch the message to the right consumer
// the consumer does some business logic
// eventually a message "B" should be produced into "all-messages"
// check if "all-messages" contains message "B"
// NOTE: "all-messages" will contain both "A" and "B"
var discard = output.receive(1000, "all-messages"); // message A
var messageB = output.receive(1000, "all-messages");
// assertions ...
}
/* ... */
}
注意:根据 pseudo-code 中的注释,最终状态由 all-messages
中的 A
和 B
表示,在这种情况下 OutputDestination
只是共享频道的 window,显然也包含我们发送的初始消息。
希望这是有道理的。我清理了代码的工作版本并将其推送到同一存储库中的 fixed
分支,以便您可以看到实际的修复。
我一直在尝试实施以下解决方案:
我的应用程序期望使用来自 all-messages
的消息 A
,执行一些业务逻辑然后生成
消息 B
返回 all-messages
.
我使用 StreamBridge
而不是 Function<A,B>
的原因是因为我希望制作方使用任意
生成消息的数量,但为了这个示例,我试图将场景简化为一个。
此外,还有一个自定义路由功能以避免无限循环,它会发送传入的消息
进入适当的消费者,incoming
或 discarded
,如果有一种方法可以有效地丢弃消息,那就太好了。
话虽如此,我无法使用 Spring Cloud Stream 完全正确地实施。
我希望你能帮助我理解我做错了什么以及如何修复当前的 configure/setup 以使解决方案按预期工作,具体来说:
- Spring 云流是否支持此解决方案?
- 我的应用程序配置是否正确地实现了上面的解决方案图?
- 当 sending/receiving 消息时,我应该在应用程序中使用哪些绑定?
最让人头疼的是绑定,所以我尝试用传入和传出绑定的不同组合编写一个测试,看看是什么,大致如下:
class ScsProblemTests {
/* ... */
@ParameterizedTest
@MethodSource("bindings")
void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
givenNoOutgoingMessages();
whenAnIncomingMessageArrives(incomingBinding);
thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
}
public static Stream<Arguments> bindings() {
return Stream.of(
Arguments.of(null, null),
Arguments.of(null, "outgoing-out-0"),
Arguments.of(null, "all-messages"),
Arguments.of("incoming-in-0", null),
Arguments.of("incoming-in-0", "outgoing-out-0"),
Arguments.of("incoming-in-0", "all-messages"),
Arguments.of("all-messages", null),
Arguments.of("all-messages", "outgoing-out-0"),
Arguments.of("all-messages", "all-messages")
);
}
/* ... */
}
我运行这组测试有overrides
spring配置文件,我根据图表设置目标覆盖,我也用不同的 spring 配置文件,没有覆盖,同样只是为了有一个控制组来比较。 no-overrides
配置文件中只有 2 个测试通过,其余测试失败。
no-overrides
配置文件显然与设计不匹配,但我很想知道覆盖是如何影响结果的,特别是通过的 no-overrides
测试是:
- incomingBinding=null, outgoingBinding=null
- incomingBinding=null, outgoingBinding=outgoing-out-0
根据我对 Spring Cloud Stream 的理解,即使在这种 no-overrides
情况下,我也希望以下内容能够通过(但不是):
- incomingBinding=incoming-in-0,outgoingBinding=null
- incomingBinding=incoming-in-0, outgoingBinding=outgoing-out-0
在这一点上,我开始认为我误解了 Spring Cloud Stream 背后的一些概念,但我真的希望 你可以提供一些有用的建议。
为方便起见,我已将我的代码共享到 this 存储库中。
提前致谢。
在我看来,您正在为不同类型的消息重复使用同一个队列,这迫使您引入路由。此外,这两个函数似乎存在于同一运行时space,那么为什么不只是pass-by-reference
(从另一个函数调用一个函数),为什么需要引入网络。
我不知道你的用例,因此只能评论我所看到的,但这里似乎存在设计问题。也许如果您真的分享了用例,我们可以帮助您找到合适的解决方案。
好的,这是我的实现中的问题以及解决方法:
- 我在路由配置中输入错误,所以没有启用,这是在 spring 云中启用功能路由器的方法:
spring:
cloud:
stream:
function:
routing:
enabled: true
- 因为我有一个功能路由器,我不再需要配置我的
incoming-in-0
绑定,而是需要为功能路由器配置一个目的地:
spring:
cloud:
stream:
bindings:
functionRouter-in-0:
destination: all-messages
outgoing-out-0:
destination: all-messages
source: outgoing
# ...
- 我误解了 bindings/destinations 以及如何使用框架提供的测试助手,特别是
InputDestination
和OutputDestination
。我不确定应该使用什么参数来发送消息或接收消息。答案是那些组件在那里模拟真正的绑定器(例如 RabbitMQ、Kafka 等),它们不知道binding
(这是来自 spring 云的构造),它们只知道destination
。 所以在我的例子中,翻译成这样:
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class ScsProblemTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
/* ... */
@Test
void consumeFromAndProduceIntoSharedTopic() {
// prepare message A ...
// simulate message "A" arriving into "all-messages"
input.send(messageA, "all-messages")
// ...
// application will pick up the message
// the function router will dispatch the message to the right consumer
// the consumer does some business logic
// eventually a message "B" should be produced into "all-messages"
// check if "all-messages" contains message "B"
// NOTE: "all-messages" will contain both "A" and "B"
var discard = output.receive(1000, "all-messages"); // message A
var messageB = output.receive(1000, "all-messages");
// assertions ...
}
/* ... */
}
注意:根据 pseudo-code 中的注释,最终状态由 all-messages
中的 A
和 B
表示,在这种情况下 OutputDestination
只是共享频道的 window,显然也包含我们发送的初始消息。
希望这是有道理的。我清理了代码的工作版本并将其推送到同一存储库中的 fixed
分支,以便您可以看到实际的修复。