Spring Kafka 消费者 - 使用恢复回调机制手动提交

Spring Kafka consumer - manual commit with recovery callback mechanism

我正在构建一个 kafka 消费者。我已经设置了类似于下面的恢复回调。我启用了手动提交。如何在恢复回调方法中确认消息才不会出现延迟。

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(conncurrency);
        factory.setConsumerFactory(consumerFactory());
        factory.setRetryTemplate(retryTemplate());
                factory.setRecoveryCallback(new RecoveryCallback<Object>() {
        @Override
        public Object recover(RetryContext context) throws Exception {
            // TODO Auto-generated method stub
            logger.debug(" In recovery callback method !!");
            return null;
        }
    });
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

    /*
     * Retry template.
     */

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(initialRetryInterval);
        policy.setMultiplier(retryMultiplier);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();
       template.setRetryPolicy(retryPolicy());
       template.setBackOffPolicy(backOffPolicy());
       return template;
    }
}

你的问题太宽泛了。你需要更具体。

框架中没有任何假设在消费错误期间重试耗尽的情况下您可以做什么。

我认为您应该从 Spring Retry 项目开始,以了解 RecoveryCallback 到底是什么以及它是如何工作的:

If the business logic does not succeed before the template decides to abort, then the client is given the chance to do some alternate processing through the recovery callback.

A RetryContext 有:

/**
 * Accessor for the exception object that caused the current retry.
 * 
 * @return the last exception that caused a retry, or possibly null. It will be null
 * if this is the first attempt, but also if the enclosing policy decides not to
 * provide it (e.g. because of concerns about memory usage).
 */
Throwable getLastThrowable();

另外 Spring Kafka 填充额外的属性到 RetryContext 以在 RecoveryCallback 中处理:https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

The contents of the RetryContext passed into the RecoveryCallback will depend on the type of listener. The context will always have an attribute record which is the record for which the failure occurred. If your listener is acknowledging and/or consumer aware, additional attributes acknowledgment and/or consumer will be available. For convenience, the RetryingAcknowledgingMessageListenerAdapter provides static constants for these keys. See its javadocs for more information.