在聚合器的 forceRelease 之后服务中发生异常时如何死信 RabbitMQ 消息
How to dead letter a RabbitMQ messages when an exceptions happens in a service after an aggregator's forceRelease
我正在尝试找出最好的方法来处理在聚合的组超时发生后调用的服务中可能发生的错误,该服务模拟了与满足 releaseExpression 相同的流。
这是我的设置:
我有一个 AmqpInboundChannelAdapter,它接收消息并将它们发送到我的聚合器。
当满足 releaseExpression 并且在 groupTimeout 到期之前,如果在我的 ServiceActivator 中抛出异常,消息将发送到我的死信队列以获取该 MessageGroup 中的所有消息。 (我下面的示例中有 10 条消息,仅用于说明目的)这是我所期望的。
如果未满足我的 releaseExpression 但已满足 groupTimeout 并且组超时,如果在我的 ServiceActivator 中抛出异常,则消息不会发送到我的死信队列并被确认。
阅读另一篇博客post后,
link1
它提到发生这种情况是因为处理发生在 MessageGroupStoreReaper 的另一个线程中,而不是 SimpleMessageListenerContainer 所在的线程。一旦处理离开 SimpleMessageListener 的线程,消息将被自动确认。
我添加了上面 link 中提到的配置,并看到错误消息被发送到我的错误处理程序。我的主要问题是什么被认为是处理这种情况以最大程度地减少消息丢失的最佳方法。
以下是我正在探索的选项:
在我的自定义错误处理程序中使用 BatchRabbitTemplate 将失败的消息发布到同一个死信队列,如果满足 releaseExpression,它们会去到同一个死信队列。 (这是我在下面概述的方法,但我担心消息丢失,如果在发布过程中发生错误)
调查是否可以让 SimpleMessageListener 知道发生的错误并让它将失败的消息批发送到死信队列?我怀疑这是可能的,因为消息似乎已经被确认了。
不要将 SimpleMessageListenerContainer 设置为 AcknowledgeMode.AUTO 并在满足 releaseExpression 或发生 groupTimeOut 时通过服务处理消息时手动确认消息。 (这看起来有点乱,因为 MessageGroup 中可以有 1..N 条消息,但想看看其他人做了什么)
理想情况下,我希望有一个流程可以在满足 releaseExpression 时模拟相同的流程,这样消息就不会丢失。
有没有人对处理他们过去使用过的这种情况的最佳方法有建议?
感谢任何帮助 and/or 建议!
这是我当前使用 Spring Integration DSL
的配置
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(4);
container.setDefaultRequeueRejected(false);
container.setTransactionManager(transactionManager);
container.setChannelTransacted(true);
container.setTxSize(10);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
@Bean
public AmqpInboundChannelAdapter inboundRabbitMessages() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());
return adapter;
}
我已经定义了一个错误通道并定义了我自己的 taskScheduler 以用于 MessageStoreRepear
@Bean
public ThreadPoolTaskScheduler taskScheduler(){
ThreadPoolTaskScheduler ts = new ThreadPoolTaskScheduler();
MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
mpe.setDefaultErrorChannel(myErrorChannel());
ts.setErrorHandler(mpe);
return ts;
}
@Bean
public PollableChannel myErrorChannel() {
return new QueueChannel();
}
public IntegrationFlow aggregationFlow() {
return IntegrationFlows.from(inboundRabbitMessages())
.transform(Transformers.fromJson(SomeObject.class))
.aggregate(a->{
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
}
)
.handle("someService", "processMessages")
.get();
}
这是我的自定义错误流程
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from("myErrorChannel")
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.channel("myErrorChannelHandler")
.handle("myErrorHandler","handleFailedMessage")
.log()
.get();
}
这是自定义错误处理程序
@Component
public class MyErrorHandler {
@Autowired
BatchingRabbitTemplate batchingRabbitTemplate;
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(Message<?> message) {
ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();
payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
}
}
这是 BatchingRabbitTemplate bean
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.initialize();
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);
batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
return batchingRabbitTemplate;
}
更新 1) 以显示自定义 MessageGroupProcessor:
public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return group;
}
}
示例服务:
@Slf4j
public class SomeService {
@ServiceActivator
public void processMessages(MessageGroup messageGroup) throws IOException {
Collection<Message<?>> messages = messageGroup.getMessages();
//Do business logic
//ack messages in the group
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug(" deliveryTag = {}",deliveryTag);
log.debug("Channel = {}",channel);
channel.basicAck(deliveryTag, false);
}
}
}
已更新 integrationFlow
public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
要处理的新错误处理程序
public class MyErrorHandler {
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
if(messageGroup!=null) {
log.debug("Nack messages size = {}", messageGroup.getMessages().size());
Collection<Message<?>> messages = messageGroup.getMessages();
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug("deliveryTag = {}",deliveryTag);
log.debug("channel = {}",channel);
channel.basicNack(deliveryTag, false, false);
}
}
}
}
更新 2 添加了自定义 ReleaseStratgedy 并更改为聚合器
public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {
private static final int MAX_MESSAGE_COUNT = 10;
public boolean canRelease(MessageGroup messageGroup) {
return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
}
}
public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.transactional(true);
a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
你的understanding.If有一些缺陷,你使用AUTO,出现异常时只有最后一条消息会死信。入群成功的消息,在发布前,会立即确认。
实现您想要的唯一方法是使用 MANUAL acks。
没办法"tell the listener container to send messages to the DLQ"。容器从不向 DLQ 发送消息,它拒绝消息并且代理将它发送到 DLX/DLQ.
我正在尝试找出最好的方法来处理在聚合的组超时发生后调用的服务中可能发生的错误,该服务模拟了与满足 releaseExpression 相同的流。
这是我的设置:
我有一个 AmqpInboundChannelAdapter,它接收消息并将它们发送到我的聚合器。
当满足 releaseExpression 并且在 groupTimeout 到期之前,如果在我的 ServiceActivator 中抛出异常,消息将发送到我的死信队列以获取该 MessageGroup 中的所有消息。 (我下面的示例中有 10 条消息,仅用于说明目的)这是我所期望的。
如果未满足我的 releaseExpression 但已满足 groupTimeout 并且组超时,如果在我的 ServiceActivator 中抛出异常,则消息不会发送到我的死信队列并被确认。
阅读另一篇博客post后, link1 它提到发生这种情况是因为处理发生在 MessageGroupStoreReaper 的另一个线程中,而不是 SimpleMessageListenerContainer 所在的线程。一旦处理离开 SimpleMessageListener 的线程,消息将被自动确认。
我添加了上面 link 中提到的配置,并看到错误消息被发送到我的错误处理程序。我的主要问题是什么被认为是处理这种情况以最大程度地减少消息丢失的最佳方法。
以下是我正在探索的选项:
在我的自定义错误处理程序中使用 BatchRabbitTemplate 将失败的消息发布到同一个死信队列,如果满足 releaseExpression,它们会去到同一个死信队列。 (这是我在下面概述的方法,但我担心消息丢失,如果在发布过程中发生错误)
调查是否可以让 SimpleMessageListener 知道发生的错误并让它将失败的消息批发送到死信队列?我怀疑这是可能的,因为消息似乎已经被确认了。
不要将 SimpleMessageListenerContainer 设置为 AcknowledgeMode.AUTO 并在满足 releaseExpression 或发生 groupTimeOut 时通过服务处理消息时手动确认消息。 (这看起来有点乱,因为 MessageGroup 中可以有 1..N 条消息,但想看看其他人做了什么)
理想情况下,我希望有一个流程可以在满足 releaseExpression 时模拟相同的流程,这样消息就不会丢失。
有没有人对处理他们过去使用过的这种情况的最佳方法有建议?
感谢任何帮助 and/or 建议!
这是我当前使用 Spring Integration DSL
的配置@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(4);
container.setDefaultRequeueRejected(false);
container.setTransactionManager(transactionManager);
container.setChannelTransacted(true);
container.setTxSize(10);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
@Bean
public AmqpInboundChannelAdapter inboundRabbitMessages() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());
return adapter;
}
我已经定义了一个错误通道并定义了我自己的 taskScheduler 以用于 MessageStoreRepear
@Bean
public ThreadPoolTaskScheduler taskScheduler(){
ThreadPoolTaskScheduler ts = new ThreadPoolTaskScheduler();
MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
mpe.setDefaultErrorChannel(myErrorChannel());
ts.setErrorHandler(mpe);
return ts;
}
@Bean
public PollableChannel myErrorChannel() {
return new QueueChannel();
}
public IntegrationFlow aggregationFlow() {
return IntegrationFlows.from(inboundRabbitMessages())
.transform(Transformers.fromJson(SomeObject.class))
.aggregate(a->{
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
}
)
.handle("someService", "processMessages")
.get();
}
这是我的自定义错误流程
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from("myErrorChannel")
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.channel("myErrorChannelHandler")
.handle("myErrorHandler","handleFailedMessage")
.log()
.get();
}
这是自定义错误处理程序
@Component
public class MyErrorHandler {
@Autowired
BatchingRabbitTemplate batchingRabbitTemplate;
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(Message<?> message) {
ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();
payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
}
}
这是 BatchingRabbitTemplate bean
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.initialize();
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);
batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
return batchingRabbitTemplate;
}
更新 1) 以显示自定义 MessageGroupProcessor:
public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return group;
}
}
示例服务:
@Slf4j
public class SomeService {
@ServiceActivator
public void processMessages(MessageGroup messageGroup) throws IOException {
Collection<Message<?>> messages = messageGroup.getMessages();
//Do business logic
//ack messages in the group
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug(" deliveryTag = {}",deliveryTag);
log.debug("Channel = {}",channel);
channel.basicAck(deliveryTag, false);
}
}
}
已更新 integrationFlow
public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
要处理的新错误处理程序
public class MyErrorHandler {
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
if(messageGroup!=null) {
log.debug("Nack messages size = {}", messageGroup.getMessages().size());
Collection<Message<?>> messages = messageGroup.getMessages();
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug("deliveryTag = {}",deliveryTag);
log.debug("channel = {}",channel);
channel.basicNack(deliveryTag, false, false);
}
}
}
}
更新 2 添加了自定义 ReleaseStratgedy 并更改为聚合器
public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {
private static final int MAX_MESSAGE_COUNT = 10;
public boolean canRelease(MessageGroup messageGroup) {
return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
}
}
public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.transactional(true);
a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
你的understanding.If有一些缺陷,你使用AUTO,出现异常时只有最后一条消息会死信。入群成功的消息,在发布前,会立即确认。
实现您想要的唯一方法是使用 MANUAL acks。
没办法"tell the listener container to send messages to the DLQ"。容器从不向 DLQ 发送消息,它拒绝消息并且代理将它发送到 DLX/DLQ.