Kafka Consumer - 在 recover 方法中获取监听器接收到的参数

Kafka Consumer - Fetch parameters received by listener in recover method

我正在构建一个 spring kafka 消费者。我已经设置了重试机制。重试结束后,我想将失败的消息推送到死信主题。

监听方法有以下参数

public void listen(@Payload Map<String, Object> conciseMap,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        Acknowledgment ack) throws JsonProcessingException {

作为恢复方法的一部分,我想获取作为输入传递给侦听器映射的 conciseMap 或我的主题接收到的原始消息。有办法吗?

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(conncurrency);
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(new RecoveryCallback<Object>() {
        @Override
        public Object recover(RetryContext context) throws Exception {
            // TODO Auto-generated method stub
            logger.debug(" In recovery callback method !!");
            ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge();

            return null;
        }
    });
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;
}

        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

    /*
     * Retry template.
     */

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(initialRetryInterval);
        policy.setMultiplier(retryMultiplier);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();
       template.setRetryPolicy(retryPolicy());
       template.setBackOffPolicy(backOffPolicy());
       return template;
    }
}

您无法在 RecoveryCallbackRetryContext 中获取转换后的 conciseMap,但您可以检索 ConsumerRecord,这是之前主题中的原始内容转换:

(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)