如何在 RabbitMQ 中处理 unknown/invalid 绑定路由键值?

How to handle unknown/invalid binding routing key values in RabbitMQ?

我想知道在交换中使用 unknown/invalid 路由键值处理消息的最佳方式是什么? 在我的例子中,我在同一个交换器内发送我的所有消息,并且基于路由键,消息被路由到相应的队列。 这是我的配置(我正在使用 Spring Cloud Stream):

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR

现在我想问的是,如果我用 payload.type='ANY' 发送消息会发生什么?显然,这条消息不会被任何消费者检索到,并且会保留在交换中,但是跟踪这些“未知”消息的最佳方式是什么?我可以为此使用 DLQ 吗?

谢谢!

您可以使用 mandatory 属性 作为您的 publishers

When a published message cannot be routed to any queue, 
and the publisher set the mandatory message property to true, the 
message will be returned to it.
The publisher must have a returned message handler set up in order to 
handle the return (e.g. by logging an error or retrying with a different exchange)

will remain inside the exchange,

没有;交换器不“保存”消息,它们只是路由器。

默认情况下会丢弃无法路由的邮件。

您可以将绑定配置为 return 无法路由的消息。

参见 Error Channels

Returns 是异步的。

在即将发布的3.1版本中,您可以等待一个future来判断消息是否发送成功。参见 Publisher Confirms

如果消息无法路由,则设置相关数据的 returnedMessage 属性。

该框架使用了另一个答案中提到的 mandatory 功能。

编辑

这是一个例子:

spring.rabbitmq.publisher-returns: true

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']

spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {

    public static void main(String[] args) {
        SpringApplication.run(So65134452Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(MessageBuilder.withPayload("foo")
                    .setHeader("rk", "invalid")
                    .build());
        };
    }

    @Autowired
    RabbitTemplate template;

    @Bean
    public Queue unroutable() {
        return new Queue("unroutable.messages");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void error(Message<?> error) {
        if (error.getPayload() instanceof ReturnedAmqpMessageException) {
            this.template.send(unroutable().getName(),
                    ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
        }
    }

}