使用 spring 云流 3.0+ 从 RabbitMQ 队列消费消息
Consume messages from RabbitMQ queue using spring cloud stream 3.0+
我有一个生产者使用直接交换在 RabbitMQ 队列中生成消息。
队列名称:TEMP_QUEUE,
交易所名称:TEMP_DIRECT_EXCHANGE
生成此队列很容易,因为在我的生成器应用程序中我使用了我熟悉的 Spring AMQP。
在我的消费者应用程序中,我需要使用 Spring cloud stream 版本 3.0+。
我想避免使用像 @EnableBinding
、@StreamListener
这样的遗留注释,因为它们即将被弃用。
我的应用程序的旧代码如下所示:
@EnableBinding(Bindings.class)
public class TempConsumer {
@StreamListener(target = "TEMP_QUEUE")
public void consumeFromTempQueue(MyObject object) {
// do stuff with the object
}
}
public interface Bindings {
@Input("TEMP_QUEUE")
SubscribableChannel myInputBinding();
}
从他们的文档中我发现我可以做类似的事情
@Bean
public Consumer<MyObject> consumeFromTempQueue() {
return obj -> {
// do stuff with the object
};
}
我不清楚如何指定此 bean 将从 TEMP_QUEUE 消耗?另外,如果我想从多个队列中消费怎么办?
您需要使用 application.yml
来绑定您的 bean。
spring.cloud.stream:
function.definition: consumeFromTempQueue
您也可以使用此配置来配置源、进程和接收器。在你的情况下,你只是在使用一个来源。
您可以阅读此 post 了解更多信息。
参见Consuming from Existing Queues/Exchanges。
您可以使用
从多个队列中消费
spring.cloud.stream.bindings.consumeFromTempQueue-in-0.destination=q1,q2,q3
spring.cloud.stream.bindings.consumer.multiplex=true
如果没有多路复用,您将获得 3 个绑定;使用多路复用,您将获得 1 个监听器容器监听多个队列。
我有一个生产者使用直接交换在 RabbitMQ 队列中生成消息。
队列名称:TEMP_QUEUE, 交易所名称:TEMP_DIRECT_EXCHANGE
生成此队列很容易,因为在我的生成器应用程序中我使用了我熟悉的 Spring AMQP。
在我的消费者应用程序中,我需要使用 Spring cloud stream 版本 3.0+。
我想避免使用像 @EnableBinding
、@StreamListener
这样的遗留注释,因为它们即将被弃用。
我的应用程序的旧代码如下所示:
@EnableBinding(Bindings.class)
public class TempConsumer {
@StreamListener(target = "TEMP_QUEUE")
public void consumeFromTempQueue(MyObject object) {
// do stuff with the object
}
}
public interface Bindings {
@Input("TEMP_QUEUE")
SubscribableChannel myInputBinding();
}
从他们的文档中我发现我可以做类似的事情
@Bean
public Consumer<MyObject> consumeFromTempQueue() {
return obj -> {
// do stuff with the object
};
}
我不清楚如何指定此 bean 将从 TEMP_QUEUE 消耗?另外,如果我想从多个队列中消费怎么办?
您需要使用 application.yml
来绑定您的 bean。
spring.cloud.stream:
function.definition: consumeFromTempQueue
您也可以使用此配置来配置源、进程和接收器。在你的情况下,你只是在使用一个来源。
您可以阅读此 post 了解更多信息。
参见Consuming from Existing Queues/Exchanges。
您可以使用
从多个队列中消费spring.cloud.stream.bindings.consumeFromTempQueue-in-0.destination=q1,q2,q3
spring.cloud.stream.bindings.consumer.multiplex=true
如果没有多路复用,您将获得 3 个绑定;使用多路复用,您将获得 1 个监听器容器监听多个队列。