使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列

Bind RabbitMQ consumer using Spring Cloud Stream to an existing queue

我使用 RabbitMQ web-UI 创建了一个主题交换 TX 并绑定到交换两个队列 TX.Q1TX.Q2,每个都相应地绑定了路由键 rk1rk2,并且向交易所发送了一些消息。

现在我想使用 Spring Cloud Stream 创建一个消费者,它将仅接收来自 Q1 的消息。 我尝试使用配置:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

和使用消息的方法的注解@StreamListner(Sink.INPUT)

结果我可以看到消费者已经创建了一个同名的队列(或绑定)TX.Q1,但是新 [=36= 的 Routing-Key ]是#.
我如何通过 Spring Cloud Stream 配置一个消费者,该消费者将使用来自预定义队列的消息(仅使用 rk1 路由)。

Spring Cloud Stream 在内部将消费者端点的路由器密钥设置为目的地名称(exchange 名称)本身或基于 partition [=18] 的路由=] 在静态分区的情况下。

我认为 this github 问题可能与您的情况有关。

所以现在,Garry Russell 建议的解决方法已经为我解决了这个问题。

我这样使用 @RabbitListener 而不是 @StreamListenet
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1").

因此,预定义队列 TX.Q1 与绑定密钥绑定:rk1 到交换 TX.

等待 Spring Cloud Steream issue 上的更新。

我想我找到了使用 @StreamListener 的解决方案,而不是使用解决方法。一切都在配置中完成,而不是在代码中。

我使用的配置如下(在 .yml 中,但您可以轻松地将其转换为 .properties):

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: <binder_name>
          destination: TX
          group: Q1
      binders:
        <binder_name>:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: <port>
                virtual-host: <vhost>
                username: <username>
                password: <password>
      rabbit:
        bindings:
          input:
            consumer:
              binding-routing-key: rk1
              exchange-name: TX
              queue-name-group-only: true
              bind-queue: true
              exchange-durable: true
              exchange-type: topic

使用这种方法,您不必编写特定代码来让 RabbitMQ 消费者连接到您的集群,这应该可以解决您的问题。

希望对您有所帮助。

鼓励在 consumer 下使用此 属性 以使 rabbit 能够从现有队列中消费。请注意,队列名称将仅从组 属性 中选取,而不是从目标中选取。

queueNameGroupOnly: true

示例:

cloud:
stream:
  # rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
  rabbit:
    bindings:
      input:
        consumer:
          acknowledgeMode: AUTO
          bindingRoutingKey: DECISION_PERSISTENCE_KEY
          declareExchange: false
          bindQueue: false
          queueNameGroupOnly: true
          consumerTagPrefix: dpa-rabbit-consumer
  bindings:
    input:
      binder: rabbit
      group: DECISION_PERSISTENCE_QUEUE
      content-type: application/json