Spring 云流 RabbitMQ - 使用一个路由密钥将 DLQ 与交换绑定

Spring cloud stream RabbitMQ - bind DLQ with an exchange using one routing key

我正在使用 Spring Cloud Stream 版本 3.0.6.RELEASE

我有一个名为 my.queue.exchange 的现有交易所。我的应用程序包含一个消费者,我想创建一个名为 MY_QUEUE 的队列并将该队列绑定到 my.queue.exchange 交换。此外,我想将失败的消息重新发布到名为 MY_QUEUE_DLQ.

的 DLQ

我的问题是名为 MY_QUEUE_DLQ 的死信队列绑定到 my.queue.exchange.dlx 与两个路由密钥而不是一个交换,第一个具有路由密钥 my.queue.rkey.dlx,第二个具有路由密钥一个带有路由键 MY_QUEUE

我的消费者 bean:

@Bean
public Consumer<Dto> consumeFunction() {

    return dto -> {
       // do stuff
    };
}

我的application.yml:

spring:
  cloud:
    stream:
      function:
        definition: consumeFunction
      rabbit:
        bindings:
          consumeFunction-in-0:
            consumer:
              autoBindDlq: true
              deadLetterQueueName: MY_QUEUE_DLQ
              deadLetterExchange: my.queue.exchange.dlx
              deadLetterRoutingKey: my.queue.rkey.dlx
              deadLetterExchangeType: topic
              declareExchange: false
              bindQueue: true
              queueNameGroupOnly: true
              bindingRoutingKey: 'my.queue.rkey'
      bindings:
        consumeFunction-in-0:
          destination: my.queue.exchange
          group: MY_QUEUE

您需要在兔子消费者属性上设置 republishToDlq: false。 在RabbitExchangeQueueProvisioner中可以看到如下代码(见评论_

if (properties instanceof RabbitConsumerProperties
                    && ((RabbitConsumerProperties) properties).isRepublishToDlq()) {
    /*
     * Also bind with the base queue name when republishToDlq is used, which
     * does not know about partitioning
     */
    declareBinding(dlqName, new Binding(dlq.getName(), DestinationType.QUEUE,
                        dlxName, baseQueueName, arguments));
}

您还可以获得有关原因的更多详细信息here

此外,我看到你正在使用

spring:
  cloud:
    stream:
      function:
        definition: consumeFunction

请改为

spring:
  cloud:
    function:
      definition: consumeFunction

因为另一个 属性 已弃用并已在 3.2 版本中删除。