在 IntegrationFlow 服务激活器方法 returns 成功之前不要确认 RabbitMQ 消息?
Don't ack RabbitMQ messages until IntegrationFlow service activator method returns succesfully?
我有一个这样定义的集成流程:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
其中 serviceActivatorBean
定义如下:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
而requestHandlerRetryAdviceForIntegrationFlow()
是这样定义的:
public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(MAX_VALUE);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
context.getRetryCount(), throwable);
}
}});
advice.setRetryTemplate(retryTemplate);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setMaxInterval(5000L);
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
retryTemplate.setBackOffPolicy(backOffPolicy);
return advice;
}
我们面临的问题是当服务激活器中的 events
集合包含 2 个或更多事件时,由于某种原因 myMethod
的处理失败并且服务器崩溃。似乎发生的是 IntegrationFlow
一次从 RabbitMQ 消费并确认一条消息,因此如果服务器在处理 myMethod
期间崩溃,除了最后一个事件之外的所有事件都会丢失。这对我们来说既不好也不安全。我们可以做些什么来配置 IntegrationFlow
在服务激活器中的 myMethod
成功完成之前不确认任何消息?
您可以使用手动确认模式,然后通过 headers 确认:
我有一个这样定义的集成流程:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
其中 serviceActivatorBean
定义如下:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
而requestHandlerRetryAdviceForIntegrationFlow()
是这样定义的:
public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(MAX_VALUE);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
context.getRetryCount(), throwable);
}
}});
advice.setRetryTemplate(retryTemplate);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setMaxInterval(5000L);
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
retryTemplate.setBackOffPolicy(backOffPolicy);
return advice;
}
我们面临的问题是当服务激活器中的 events
集合包含 2 个或更多事件时,由于某种原因 myMethod
的处理失败并且服务器崩溃。似乎发生的是 IntegrationFlow
一次从 RabbitMQ 消费并确认一条消息,因此如果服务器在处理 myMethod
期间崩溃,除了最后一个事件之外的所有事件都会丢失。这对我们来说既不好也不安全。我们可以做些什么来配置 IntegrationFlow
在服务激活器中的 myMethod
成功完成之前不确认任何消息?
您可以使用手动确认模式,然后通过 headers 确认: