如何在 RabbitMQ 中处理 unknown/invalid 绑定路由键值?
How to handle unknown/invalid binding routing key values in RabbitMQ?
我想知道在交换中使用 unknown/invalid 路由键值处理消息的最佳方式是什么?
在我的例子中,我在同一个交换器内发送我的所有消息,并且基于路由键,消息被路由到相应的队列。
这是我的配置(我正在使用 Spring Cloud Stream):
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type
spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2
spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
现在我想问的是,如果我用 payload.type='ANY'
发送消息会发生什么?显然,这条消息不会被任何消费者检索到,并且会保留在交换中,但是跟踪这些“未知”消息的最佳方式是什么?我可以为此使用 DLQ 吗?
谢谢!
您可以使用 mandatory
属性 作为您的 publishers。
When a published message cannot be routed to any queue,
and the publisher set the mandatory message property to true, the
message will be returned to it.
The publisher must have a returned message handler set up in order to
handle the return (e.g. by logging an error or retrying with a different exchange)
will remain inside the exchange,
没有;交换器不“保存”消息,它们只是路由器。
默认情况下会丢弃无法路由的邮件。
您可以将绑定配置为 return 无法路由的消息。
参见 Error Channels。
Returns 是异步的。
在即将发布的3.1版本中,您可以等待一个future来判断消息是否发送成功。参见 Publisher Confirms。
如果消息无法路由,则设置相关数据的 returnedMessage
属性。
该框架使用了另一个答案中提到的 mandatory
功能。
编辑
这是一个例子:
spring.rabbitmq.publisher-returns: true
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {
public static void main(String[] args) {
SpringApplication.run(So65134452Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(MessageBuilder.withPayload("foo")
.setHeader("rk", "invalid")
.build());
};
}
@Autowired
RabbitTemplate template;
@Bean
public Queue unroutable() {
return new Queue("unroutable.messages");
}
@ServiceActivator(inputChannel = "errorChannel")
public void error(Message<?> error) {
if (error.getPayload() instanceof ReturnedAmqpMessageException) {
this.template.send(unroutable().getName(),
((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
}
}
}
我想知道在交换中使用 unknown/invalid 路由键值处理消息的最佳方式是什么? 在我的例子中,我在同一个交换器内发送我的所有消息,并且基于路由键,消息被路由到相应的队列。 这是我的配置(我正在使用 Spring Cloud Stream):
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type
spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2
spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
现在我想问的是,如果我用 payload.type='ANY'
发送消息会发生什么?显然,这条消息不会被任何消费者检索到,并且会保留在交换中,但是跟踪这些“未知”消息的最佳方式是什么?我可以为此使用 DLQ 吗?
谢谢!
您可以使用 mandatory
属性 作为您的 publishers。
When a published message cannot be routed to any queue,
and the publisher set the mandatory message property to true, the
message will be returned to it.
The publisher must have a returned message handler set up in order to
handle the return (e.g. by logging an error or retrying with a different exchange)
will remain inside the exchange,
没有;交换器不“保存”消息,它们只是路由器。
默认情况下会丢弃无法路由的邮件。
您可以将绑定配置为 return 无法路由的消息。
参见 Error Channels。
Returns 是异步的。
在即将发布的3.1版本中,您可以等待一个future来判断消息是否发送成功。参见 Publisher Confirms。
如果消息无法路由,则设置相关数据的 returnedMessage
属性。
该框架使用了另一个答案中提到的 mandatory
功能。
编辑
这是一个例子:
spring.rabbitmq.publisher-returns: true
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {
public static void main(String[] args) {
SpringApplication.run(So65134452Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(MessageBuilder.withPayload("foo")
.setHeader("rk", "invalid")
.build());
};
}
@Autowired
RabbitTemplate template;
@Bean
public Queue unroutable() {
return new Queue("unroutable.messages");
}
@ServiceActivator(inputChannel = "errorChannel")
public void error(Message<?> error) {
if (error.getPayload() instanceof ReturnedAmqpMessageException) {
this.template.send(unroutable().getName(),
((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
}
}
}