Spring 云流+ spring 重试,如何添加发送到DLQ 的恢复回调和禁用逻辑?
Spring cloud stream + spring retry, How to add recovery callback and disable logic that send to DLQ?
我正在使用spring云流+ rabbit mq binder。
在我的 @StreaListener 中,我想使用 RetryTemplate 对特定异常应用重试逻辑。在重试用尽或抛出不可重试错误后,我想添加一个恢复回调,它将带有错误消息的新记录保存到我的 Postgres 数据库中,并以消息结束(移至下一个)。
这是我到目前为止得到的:
@StreamListener(Sink.INPUT)
public void saveUser(User user) {
User user = userService.saveUser(user); //could throw exceptions
log.info(">>>>>>User is created successfully: {}", user);
}
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(ConnectionException.class, true);
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
//could add recovery logic here, like save error to db why sertain user was not saved
log.info("retries exausted");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
log.error("Error on retry", throwable);
}
});
retryTemplate.setRetryPolicy(
new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
return retryTemplate;
}
从属性来看,我只有这些(没有任何 dlq 配置)
spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer
重试失败后我得到了这个日志。
2020-06-01 20:05:58.674 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://guest@127.0.0.1:56722/, localPort= 50728]
2020-06-01 20:05:58.697 INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)
触发RetryListener关闭方法后,我可以看到监听器试图连接到DLX,可能是为了发布一条错误消息。而且我不希望它每次都在日志中观察此错误消息。
所以我的问题是:
1) 在哪里为我的 retryTemplate 添加 RecoveryCalback?据说我可以在 RetryListener#close 方法中通过将错误保存到数据库来编写恢复逻辑,但显然应该有更合适的方法来做到这一点。
2) 如何配置 rabbit-mq 绑定器不向 DLQ 发送消息,也许我可以重写一些方法?目前,在重试次数耗尽(或出现不可重试错误)后,侦听器会尝试向 DLX 发送消息并记录无法找到它的错误。我不需要在我的应用程序范围内将任何消息发送到 dlq,我只需要将它保存到 DB。
目前没有提供自定义恢复回调的机制。
将republishToDlq
设置为false
(以前是)。改为true
,如果autoBindDlq
为false(默认)则错误;我会为此打开一个问题。
然后,当重试次数耗尽时,将异常抛回给容器;您可以使用 ListenerContainerCustomizer
添加自定义 ErrorHandler
.
但是,您获得的数据将有一个 ListenerExecutionFailed
异常,原始(未转换)Spring AMQP Message
在其 failedMessage
属性 ,不是您的 User
对象。
编辑
您可以向绑定的错误通道添加侦听器...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So62137618Application {
public static void main(String[] args) {
SpringApplication.run(So62137618Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@ServiceActivator(inputChannel = "user-topic.user-consumer.errors")
public void errors(String in) {
System.out.println("Retries exhausted for " + new String((byte[]) in.getFailedMessage().getPayload()));
}
}
我正在使用spring云流+ rabbit mq binder。
在我的 @StreaListener 中,我想使用 RetryTemplate 对特定异常应用重试逻辑。在重试用尽或抛出不可重试错误后,我想添加一个恢复回调,它将带有错误消息的新记录保存到我的 Postgres 数据库中,并以消息结束(移至下一个)。 这是我到目前为止得到的:
@StreamListener(Sink.INPUT)
public void saveUser(User user) {
User user = userService.saveUser(user); //could throw exceptions
log.info(">>>>>>User is created successfully: {}", user);
}
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(ConnectionException.class, true);
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
//could add recovery logic here, like save error to db why sertain user was not saved
log.info("retries exausted");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
log.error("Error on retry", throwable);
}
});
retryTemplate.setRetryPolicy(
new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
return retryTemplate;
}
从属性来看,我只有这些(没有任何 dlq 配置)
spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer
重试失败后我得到了这个日志。
2020-06-01 20:05:58.674 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://guest@127.0.0.1:56722/, localPort= 50728]
2020-06-01 20:05:58.697 INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)
触发RetryListener关闭方法后,我可以看到监听器试图连接到DLX,可能是为了发布一条错误消息。而且我不希望它每次都在日志中观察此错误消息。
所以我的问题是:
1) 在哪里为我的 retryTemplate 添加 RecoveryCalback?据说我可以在 RetryListener#close 方法中通过将错误保存到数据库来编写恢复逻辑,但显然应该有更合适的方法来做到这一点。
2) 如何配置 rabbit-mq 绑定器不向 DLQ 发送消息,也许我可以重写一些方法?目前,在重试次数耗尽(或出现不可重试错误)后,侦听器会尝试向 DLX 发送消息并记录无法找到它的错误。我不需要在我的应用程序范围内将任何消息发送到 dlq,我只需要将它保存到 DB。
目前没有提供自定义恢复回调的机制。
将republishToDlq
设置为false
(以前是)。改为true
,如果autoBindDlq
为false(默认)则错误;我会为此打开一个问题。
然后,当重试次数耗尽时,将异常抛回给容器;您可以使用 ListenerContainerCustomizer
添加自定义 ErrorHandler
.
但是,您获得的数据将有一个 ListenerExecutionFailed
异常,原始(未转换)Spring AMQP Message
在其 failedMessage
属性 ,不是您的 User
对象。
编辑
您可以向绑定的错误通道添加侦听器...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So62137618Application {
public static void main(String[] args) {
SpringApplication.run(So62137618Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@ServiceActivator(inputChannel = "user-topic.user-consumer.errors")
public void errors(String in) {
System.out.println("Retries exhausted for " + new String((byte[]) in.getFailedMessage().getPayload()));
}
}