在聚合器的 forceRelease 之后服务中发生异常时如何死信 RabbitMQ 消息

How to dead letter a RabbitMQ messages when an exceptions happens in a service after an aggregator's forceRelease

我正在尝试找出最好的方法来处理在聚合的组超时发生后调用的服务中可能发生的错误,该服务模拟了与满足 releaseExpression 相同的流。

这是我的设置:

我有一个 AmqpInboundChannelAdapter,它接收消息并将它们发送到我的聚合器。

当满足 releaseExpression 并且在 groupTimeout 到期之前,如果在我的 ServiceActivator 中抛出异常,消息将发送到我的死信队列以获取该 MessageGroup 中的所有消息。 (我下面的示例中有 10 条消息,仅用于说明目的)这是我所期望的。

如果未满足我的 releaseExpression 但已满足 groupTimeout 并且组超时,如果在我的 ServiceActivator 中抛出异常,则消息不会发送到我的死信队列并被确认。

阅读另一篇博客post后, link1 它提到发生这种情况是因为处理发生在 MessageGroupStoreReaper 的另一个线程中,而不是 SimpleMessageListenerContainer 所在的线程。一旦处理离开 SimpleMessageListener 的线程,消息将被自动确认。

我添加了上面 link 中提到的配置,并看到错误消息被发送到我的错误处理程序。我的主要问题是什么被认为是处理这种情况以最大程度地减少消息丢失的最佳方法。

以下是我正在探索的选项:

理想情况下,我希望有一个流程可以在满足 releaseExpression 时模拟相同的流程,这样消息就不会丢失。

有没有人对处理他们过去使用过的这种情况的最佳方法有建议?

感谢任何帮助 and/or 建议!

这是我当前使用 Spring Integration DSL

的配置
@Bean
    public SimpleMessageListenerContainer workListenerContainer() {
        SimpleMessageListenerContainer container =
                new SimpleMessageListenerContainer(rabbitConnectionFactory);
        container.setQueues(worksQueue());
        container.setConcurrentConsumers(4);
        container.setDefaultRequeueRejected(false);
        container.setTransactionManager(transactionManager);
        container.setChannelTransacted(true);
        container.setTxSize(10);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);          
        return container;
    }

  @Bean
    public AmqpInboundChannelAdapter inboundRabbitMessages() {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());       
        return adapter;
    }

我已经定义了一个错误通道并定义了我自己的 taskScheduler 以用于 MessageStoreRepear

   @Bean 
    public ThreadPoolTaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler  ts = new ThreadPoolTaskScheduler();
        MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
        mpe.setDefaultErrorChannel(myErrorChannel());
        ts.setErrorHandler(mpe);
        return ts;
    }


    @Bean
    public PollableChannel myErrorChannel() {
        return new QueueChannel();
    }
 public IntegrationFlow aggregationFlow() {
        return IntegrationFlows.from(inboundRabbitMessages())               
                .transform(Transformers.fromJson(SomeObject.class))             
                 .aggregate(a->{
                    a.sendPartialResultOnExpiry(true);                  
                    a.groupTimeout(3000);   
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);                    
                    a.correlationExpression("T(Thread).currentThread().id");
                    a.releaseExpression("size() == 10");                            
                    a.transactional(true);
                 }
                )               
                .handle("someService", "processMessages")
                .get();
    }

这是我的自定义错误流程

@Bean
    public IntegrationFlow errorResponse() {
        return IntegrationFlows.from("myErrorChannel")
                    .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
                            e -> e.poller(p -> p.fixedDelay(100)))
                    .channel("myErrorChannelHandler")
                    .handle("myErrorHandler","handleFailedMessage")
                    .log()
                    .get();
    }

这是自定义错误处理程序

@Component
public class MyErrorHandler {

    @Autowired
    BatchingRabbitTemplate batchingRabbitTemplate;

    @ServiceActivator(inputChannel = "myErrorChannelHandler")
    public void handleFailedMessage(Message<?> message) {       
        ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();        
        payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
    }

}

这是 BatchingRabbitTemplate bean

    @Bean   
    public BatchingRabbitTemplate batchingRabbitTemplate() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(5);
        scheduler.initialize();
        BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
        BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);    
        batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
        return batchingRabbitTemplate;
    }

更新 1) 以显示自定义 MessageGroupProcessor:

public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    @Override
    protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
        return group;
    }
}

示例服务:

@Slf4j
public class SomeService  {
    @ServiceActivator
    public void processMessages(MessageGroup messageGroup) throws IOException {
        Collection<Message<?>> messages  = messageGroup.getMessages();
        //Do business logic 
        //ack messages in the group
        for (Message<?> m : messages) {
            com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel) 
                    m.getHeaders().get("amqp_channel");
            long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
            log.debug(" deliveryTag = {}",deliveryTag);
            log.debug("Channel = {}",channel);
            channel.basicAck(deliveryTag, false);
        }
    }
}

已更新 integrationFlow

public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
        return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
                .aggregate(a -> {
                    a.sendPartialResultOnExpiry(true);
                    a.groupTimeout(3000);
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);
                    a.correlationExpression("T(Thread).currentThread().id");
                    a.releaseExpression("size() == 10");
                    a.transactional(true);
                    a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
                }).handle("someService", "processMessages").get();
    }

要处理的新错误处理程序

public class MyErrorHandler {

    @ServiceActivator(inputChannel = "myErrorChannelHandler")
    public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
        if(messageGroup!=null) {
            log.debug("Nack messages size = {}", messageGroup.getMessages().size());
            Collection<Message<?>> messages  = messageGroup.getMessages();
            for (Message<?> m : messages) {
                com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel) 
                        m.getHeaders().get("amqp_channel");
                long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");           
                log.debug("deliveryTag = {}",deliveryTag);
                log.debug("channel = {}",channel);
                channel.basicNack(deliveryTag, false, false);
            }       
        }
    }
}

更新 2 添加了自定义 ReleaseStratgedy 并更改为聚合器

public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {

    private static final int MAX_MESSAGE_COUNT = 10;

    public boolean canRelease(MessageGroup messageGroup) {
        return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
    }
}
   public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
        return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
                .aggregate(a -> {
                    a.sendPartialResultOnExpiry(true);
                    a.groupTimeout(3000);
                    a.expireGroupsUponCompletion(true);
                    a.expireGroupsUponTimeout(true);
                    a.correlationExpression("T(Thread).currentThread().id");                   
                    a.transactional(true);
                    a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());            
                    a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
                }).handle("someService", "processMessages").get();
    }

你的understanding.If有一些缺陷,你使用AUTO,出现异常时只有最后一条消息会死信。入群成功的消息,在发布前,会立即确认。

实现您想要的唯一方法是使用 MANUAL acks。

没办法"tell the listener container to send messages to the DLQ"。容器从不向 DLQ 发送消息,它拒绝消息并且代理将它发送到 DLX/DLQ.