使用 rabbitmq 了解 spring 云消息传递
Understanding spring cloud messaging with rabbitmq
我想我在理解 spring 云消息传递时遇到问题,无法找到我面临的 "problem" 问题的答案。
我有以下设置(使用 spring-boot 2.0.3.RELEASE)。
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
bindings:
input:
destination: foo
group: fooGroup
fooChannel:
destination: foo
服务class
@Autowired
FoodOrderController foodOrderController;
@Bean
public CommandLineRunner runner() {
return (String[] args) -> {
IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
};
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
System.out.println("This was a great meal!: "+ meal);
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
System.out.println("This was a great meal!: "+ meal);
}
FoodOrderController
public class FoodOrderController {
@Autowired
FoodOrderSource foodOrderSource;
public String orderFood(){
var foodOrder = new FoodOrder();
foodOrder.setCustomerAddress(UUID.randomUUID().toString());
foodOrder.setOrderDescription(UUID.randomUUID().toString());
foodOrder.setRestaurant("foo");
foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
// System.out.println(foodOrder.toString());
return "food ordered!";
}
}
FoodOrderSource
public interface FoodOrderSource {
String INPUT = "foo";
String OUTPUT = "fooChannel";
@Input("foo")
SubscribableChannel foo();
@Output("fooChannel")
MessageChannel foodOrders();
}
FoodOrderPublisher
@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}
设置正常,但 StreamListener
收到的消息相同。所以一切都被记录了两次。阅读文档,它说在队列绑定中指定一个 group
,两个侦听器都将在组内注册,只有一个侦听器会收到一条消息。我知道上面的示例不合理,但我想模拟具有多个侦听器设置的多节点环境。
为什么两个听众都收到消息?我怎样才能确保在设置组中只收到一次消息?
根据文档,默认情况下消息也应该自动确认,但我找不到任何表明消息实际得到确认的信息。我在这里遗漏了什么吗?
这里是兔子管理员的一些截图
Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message.
当侦听器位于不同的应用程序实例中时,情况确实如此。当同一实例中有多个侦听器时,他们都会收到相同的消息。这通常与 condition
一起使用,每个听众都可以表达对他们感兴趣的食物的兴趣。Documented here.
基本上,竞争消费者是将消息分派给应用程序中实际 @StreamListener
的绑定本身。
所以,你不能"mimic a multi-node environment with multiple listeners setup"这样。
but I can't find anything that indicates that the messages actually get acknowledged
你这是什么意思?如果消息处理成功,容器确认消息并将其从队列中移除。
post 上已经回复了正确答案,但您仍然可以查看此内容:
我想我在理解 spring 云消息传递时遇到问题,无法找到我面临的 "problem" 问题的答案。
我有以下设置(使用 spring-boot 2.0.3.RELEASE)。
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
bindings:
input:
destination: foo
group: fooGroup
fooChannel:
destination: foo
服务class
@Autowired
FoodOrderController foodOrderController;
@Bean
public CommandLineRunner runner() {
return (String[] args) -> {
IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
};
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
System.out.println("This was a great meal!: "+ meal);
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
System.out.println("This was a great meal!: "+ meal);
}
FoodOrderController
public class FoodOrderController {
@Autowired
FoodOrderSource foodOrderSource;
public String orderFood(){
var foodOrder = new FoodOrder();
foodOrder.setCustomerAddress(UUID.randomUUID().toString());
foodOrder.setOrderDescription(UUID.randomUUID().toString());
foodOrder.setRestaurant("foo");
foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
// System.out.println(foodOrder.toString());
return "food ordered!";
}
}
FoodOrderSource
public interface FoodOrderSource {
String INPUT = "foo";
String OUTPUT = "fooChannel";
@Input("foo")
SubscribableChannel foo();
@Output("fooChannel")
MessageChannel foodOrders();
}
FoodOrderPublisher
@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}
设置正常,但 StreamListener
收到的消息相同。所以一切都被记录了两次。阅读文档,它说在队列绑定中指定一个 group
,两个侦听器都将在组内注册,只有一个侦听器会收到一条消息。我知道上面的示例不合理,但我想模拟具有多个侦听器设置的多节点环境。
为什么两个听众都收到消息?我怎样才能确保在设置组中只收到一次消息?
根据文档,默认情况下消息也应该自动确认,但我找不到任何表明消息实际得到确认的信息。我在这里遗漏了什么吗?
这里是兔子管理员的一些截图
Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message.
当侦听器位于不同的应用程序实例中时,情况确实如此。当同一实例中有多个侦听器时,他们都会收到相同的消息。这通常与 condition
一起使用,每个听众都可以表达对他们感兴趣的食物的兴趣。Documented here.
基本上,竞争消费者是将消息分派给应用程序中实际 @StreamListener
的绑定本身。
所以,你不能"mimic a multi-node environment with multiple listeners setup"这样。
but I can't find anything that indicates that the messages actually get acknowledged
你这是什么意思?如果消息处理成功,容器确认消息并将其从队列中移除。
post 上已经回复了正确答案,但您仍然可以查看此内容: