使用 spring 云流 3.0+ 从 RabbitMQ 队列消费消息

Consume messages from RabbitMQ queue using spring cloud stream 3.0+

我有一个生产者使用直接交换在 RabbitMQ 队列中生成消息。

队列名称:TEMP_QUEUE, 交易所名称:TEMP_DIRECT_EXCHANGE

生成此队列很容易,因为在我的生成器应用程序中我使用了我熟悉的 Spring AMQP

在我的消费者应用程序中,我需要使用 Spring cloud stream 版本 3.0+。

我想避免使用像 @EnableBinding@StreamListener 这样的遗留注释,因为它们即将被弃用。

我的应用程序的旧代码如下所示:

@EnableBinding(Bindings.class)
public class TempConsumer {

    @StreamListener(target = "TEMP_QUEUE")
    public void consumeFromTempQueue(MyObject object) {
        // do stuff with the object
    }
}

public interface Bindings {
    @Input("TEMP_QUEUE")
    SubscribableChannel myInputBinding();
}

从他们的文档中我发现我可以做类似的事情

@Bean
public Consumer<MyObject> consumeFromTempQueue() {
    return obj -> {
        // do stuff with the object
    };
}

我不清楚如何指定此 bean 将从 TEMP_QUEUE 消耗?另外,如果我想从多个队列中消费怎么办?

您需要使用 application.yml 来绑定您的 bean。

spring.cloud.stream:
  function.definition: consumeFromTempQueue

您也可以使用此配置来配置源、进程和接收器。在你的情况下,你只是在使用一个来源。

您可以阅读此 post 了解更多信息。

参见Consuming from Existing Queues/Exchanges

您可以使用

从多个队列中消费
spring.cloud.stream.bindings.consumeFromTempQueue-in-0.destination=q1,q2,q3
spring.cloud.stream.bindings.consumer.multiplex=true

如果没有多路复用,您将获得 3 个绑定;使用多路复用,您将获得 1 个监听器容器监听多个队列。