消息传递到 QueueChannel 后如何处理错误?
How to handle errors after message has been handed off to QueueChannel?
我有 10 个 rabbitMQ 队列,分别称为 event.q.0、event.q.2、<...>、event.q.9。这些队列中的每一个都接收从 event.consistent-hash 交换路由的消息。我想构建一个容错解决方案,它将按顺序使用特定事件的消息,因为排序很重要。为此,我设置了一个流来侦听这些队列,并根据事件 ID 将消息路由到特定的工作流。工作流基于队列通道工作,因此应保证具有特定 ID 的事件的 FIFO 顺序。我想出了以下设置:
@Bean
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
return IntegrationFlows
.from(
Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
.configureContainer(c -> c
.adviceChain(retryAdvice())
.addQueueNames(queueNames)
.prefetchCount(amqpProperties.getPreMatch().getDefinition().getQueues().getEvent().getPrefetch())
)
.messageConverter(rabbitTemplate.getMessageConverter())
)
.<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
.get();
}
private Advice deadLetterAdvice() {
return RetryInterceptorBuilder
.stateless()
.maxAttempts(3)
.recoverer(recoverer())
.backOffPolicy(backOffPolicy())
.build();
}
private ExponentialBackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(3.0);
backOffPolicy.setMaxInterval(15000);
return backOffPolicy;
}
private MessageRecoverer recoverer() {
return new RepublishMessageRecoverer(
rabbitTemplate,
"error.exchange.dlx"
);
}
@PostConstruct
public void init() {
for (int i = 0; i < numberOfWorkers; i++) {
flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
.autoStartup(false)
.id(String.format("worker-flow-%d", i))
.register();
}
}
private IntegrationFlow workerFlow(QueueChannel channel) {
return IntegrationFlows
.from(channel)
.<Object, Class<?>>route(Object::getClass, m -> m
.resolutionRequired(true)
.defaultOutputToParentFlow()
.subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
.subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
)
.get();
}
现在,当 eventConsumerFlow
发生错误时,重试机制按预期工作,但当 workerFlow
发生错误时,重试不再起作用,消息也不再'被发送到死信交换。我认为这是因为一旦消息被传递到 QueueChannel,它就会自动得到确认。我怎样才能使重试机制在 workerFlow
中也起作用,以便如果那里发生异常,它可以重试几次并在尝试用完时向 DLX 发送消息?
如果你想要弹性,你根本不应该使用队列通道;将消息放入内存队列后,将立即确认消息;如果服务器崩溃,这些消息将丢失。
如果不希望消息丢失,您应该为每个队列配置一个单独的适配器。
也就是说,为了回答一般性问题,下游流(包括队列通道之后)的任何错误都将发送到入站适配器上定义的 errorChannel
。
我有 10 个 rabbitMQ 队列,分别称为 event.q.0、event.q.2、<...>、event.q.9。这些队列中的每一个都接收从 event.consistent-hash 交换路由的消息。我想构建一个容错解决方案,它将按顺序使用特定事件的消息,因为排序很重要。为此,我设置了一个流来侦听这些队列,并根据事件 ID 将消息路由到特定的工作流。工作流基于队列通道工作,因此应保证具有特定 ID 的事件的 FIFO 顺序。我想出了以下设置:
@Bean
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
return IntegrationFlows
.from(
Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
.configureContainer(c -> c
.adviceChain(retryAdvice())
.addQueueNames(queueNames)
.prefetchCount(amqpProperties.getPreMatch().getDefinition().getQueues().getEvent().getPrefetch())
)
.messageConverter(rabbitTemplate.getMessageConverter())
)
.<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
.get();
}
private Advice deadLetterAdvice() {
return RetryInterceptorBuilder
.stateless()
.maxAttempts(3)
.recoverer(recoverer())
.backOffPolicy(backOffPolicy())
.build();
}
private ExponentialBackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(3.0);
backOffPolicy.setMaxInterval(15000);
return backOffPolicy;
}
private MessageRecoverer recoverer() {
return new RepublishMessageRecoverer(
rabbitTemplate,
"error.exchange.dlx"
);
}
@PostConstruct
public void init() {
for (int i = 0; i < numberOfWorkers; i++) {
flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
.autoStartup(false)
.id(String.format("worker-flow-%d", i))
.register();
}
}
private IntegrationFlow workerFlow(QueueChannel channel) {
return IntegrationFlows
.from(channel)
.<Object, Class<?>>route(Object::getClass, m -> m
.resolutionRequired(true)
.defaultOutputToParentFlow()
.subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
.subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
)
.get();
}
现在,当 eventConsumerFlow
发生错误时,重试机制按预期工作,但当 workerFlow
发生错误时,重试不再起作用,消息也不再'被发送到死信交换。我认为这是因为一旦消息被传递到 QueueChannel,它就会自动得到确认。我怎样才能使重试机制在 workerFlow
中也起作用,以便如果那里发生异常,它可以重试几次并在尝试用完时向 DLX 发送消息?
如果你想要弹性,你根本不应该使用队列通道;将消息放入内存队列后,将立即确认消息;如果服务器崩溃,这些消息将丢失。
如果不希望消息丢失,您应该为每个队列配置一个单独的适配器。
也就是说,为了回答一般性问题,下游流(包括队列通道之后)的任何错误都将发送到入站适配器上定义的 errorChannel
。