将格式无效的 XML 消息发送到 AMQP 停车场队列

Park XML message in invalid format to AMQP parking lot queue

假设我有 IntegrationFlow

IntegrationFlows.from(
        Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
                .messageConverter(new MarshallingMessageConverter(xmlMarshaller))
                .defaultRequeueRejected(false)
                .concurrentConsumers(2)
                .maxConcurrentConsumers(4)
                .channelTransacted(true)
                .errorHandler(new ConditionalRejectingErrorHandler())
)
        .log(INFO, AMQP_LOGGER_CATEGORY)
        .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(deathCheckHandler))
                .subscribe(f -> f.handle(service))
        )
        .get();

其中 deathCheckHandler

@Component
public class DeathCheckHandler {

    private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());

    private static final int RETRY_COUNT = 3;
    private final RabbitTemplate rabbitTemplate;
    private final Jaxb2Marshaller xmlMarshaller;

    public DeathCheckHandler(RabbitTemplate rabbitTemplate, Jaxb2Marshaller xmlMarshaller) {
        this.rabbitTemplate = rabbitTemplate;
        this.xmlMarshaller = xmlMarshaller;
    }

    @ServiceActivator
    public void check(Message<?> message) {
        MessageHeaders headers = message.getHeaders();

        Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
        if (rejected.isPresent()) {
            int rejectedCount = rejected.get().getCount();
            logger.debug("Rejected count is {}", rejectedCount);
            if (rejectedCount > RETRY_COUNT) {
                parkMessage(message);
            }
        }
    }

    private void parkMessage(Message<?> message) {
        Object payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        String parkingExchange = (String) headers.get("amqp_receivedExchange");
        String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue", "plq");
        rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
        logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}", RETRY_COUNT, payload, parkingExchange, parkingRoutingKey);
        rabbitTemplate.convertAndSend(parkingExchange, parkingRoutingKey, payload);
        // cause the message to be acknowledged and not routed to DLQ
        throw new ImmediateAcknowledgeAmqpException("Give up retrying message: " + payload);
    }
}

DeathCheckHandler 处理在 AMQP 队列上设置的死信。

如何以不正确的格式停放 XML 消息,即当 MarshallingMessageConverter 抛出 UnmarshallingFailureException.

我想以与 DeathCheckHandler#parkMessage

中类似的方式停车

ConditionalRejectingErrorHandler应该可以,但我不知道怎么做。

克隆 ConditionalRejectingErrorHandler.

使用此方法作为模板...

@Override
public void handleError(Throwable t) {
    log(t);
    if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
        if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
            Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
            if (failed != null) {
                List<Map<String, ?>> xDeath = failed.getMessageProperties().getXDeathHeader();
                if (xDeath != null && xDeath.size() > 0) {
                    this.logger.error("x-death header detected on a message with a fatal exception; "
                            + "perhaps requeued from a DLQ? - discarding: " + failed);
                    throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
                }
            }
        }
        throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", this.rejectManual,
                t);
    }
}

默认情况下,带有 x-death header 的致命异常会通过 ImmediateAcknowledgeAmqpException.

丢弃

子class并重写这个方法并不容易,因为字段是私有的,所以最简单的方法就是复制这个class(并在投掷 IAAE 之前发布到停车场) .

我会对此 class 进行一些改进,使其更容易 customize/override。

Pull Request.