Spring Cloud Stream:如何重新发布到死信队列并抛出异常
Spring Cloud Stream: how to republish to dead letter queue and also throw exception
我正在将一个使用 Spring AMQP 的项目迁移到一个使用 Spring Cloud Stream with RabbitMQ 的项目。
在我的旧项目中,当使用@RabbitListener 处理消息时发生某些异常时,会抛出该异常。如果有一个死信队列绑定,异常仍然会被抛出(如果有重试,只有一次,我猜是最后一次)。这对于记录目的非常有帮助。
在Spring云中,如果您定义属性,@StreamListener 有一个死信队列机制:
spring.cloud.stream.bindings.input1.destination=dest1
spring.cloud.stream.rabbit.bindings.input1.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input1.consumer.republishToDlq=true
但是如果你有这样的方法(只是一个例子):
@StreamListener("input1")
public void process(String message){
System.out.println("Trying...");
throw new RuntimeException();
}
日志是:
Trying...
Trying...
Trying...
(end of log, no exception thrown)
有没有办法抛出异常(只在最后一次重试)?
谢谢!
请参阅有关消费者属性的文档。
设置 ...consumer.max-attempts=1
以禁用重试。
您可以处理异常,记录它然后抛出 AmqpRejectAndDontRequeueException。这会将消息发送到死信队列
您在 @StreamListener
下,您希望异常发生在哪里?谁抓住了?
你可以这样做:
@StreamListener("input1")
public void process(String message){
try {
System.out.println("Trying...");
throw new RuntimeException();
// or the actual code that handle the message
} catch (RuntimeException re) {
// handle the exception, logging etc.
throw re
}
}
我正在将一个使用 Spring AMQP 的项目迁移到一个使用 Spring Cloud Stream with RabbitMQ 的项目。
在我的旧项目中,当使用@RabbitListener 处理消息时发生某些异常时,会抛出该异常。如果有一个死信队列绑定,异常仍然会被抛出(如果有重试,只有一次,我猜是最后一次)。这对于记录目的非常有帮助。
在Spring云中,如果您定义属性,@StreamListener 有一个死信队列机制:
spring.cloud.stream.bindings.input1.destination=dest1
spring.cloud.stream.rabbit.bindings.input1.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input1.consumer.republishToDlq=true
但是如果你有这样的方法(只是一个例子):
@StreamListener("input1")
public void process(String message){
System.out.println("Trying...");
throw new RuntimeException();
}
日志是:
Trying...
Trying...
Trying...
(end of log, no exception thrown)
有没有办法抛出异常(只在最后一次重试)?
谢谢!
请参阅有关消费者属性的文档。
设置 ...consumer.max-attempts=1
以禁用重试。
您可以处理异常,记录它然后抛出 AmqpRejectAndDontRequeueException。这会将消息发送到死信队列
您在 @StreamListener
下,您希望异常发生在哪里?谁抓住了?
你可以这样做:
@StreamListener("input1")
public void process(String message){
try {
System.out.println("Trying...");
throw new RuntimeException();
// or the actual code that handle the message
} catch (RuntimeException re) {
// handle the exception, logging etc.
throw re
}
}