Kafka Consumer - 在 recover 方法中获取监听器接收到的参数
Kafka Consumer - Fetch parameters received by listener in recover method
我正在构建一个 spring kafka 消费者。我已经设置了重试机制。重试结束后,我想将失败的消息推送到死信主题。
监听方法有以下参数
public void listen(@Payload Map<String, Object> conciseMap,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment ack) throws JsonProcessingException {
作为恢复方法的一部分,我想获取作为输入传递给侦听器映射的 conciseMap 或我的主题接收到的原始消息。有办法吗?
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge();
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
您无法在 RecoveryCallback
的 RetryContext
中获取转换后的 conciseMap
,但您可以检索 ConsumerRecord
,这是之前主题中的原始内容转换:
(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)
我正在构建一个 spring kafka 消费者。我已经设置了重试机制。重试结束后,我想将失败的消息推送到死信主题。
监听方法有以下参数
public void listen(@Payload Map<String, Object> conciseMap,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
Acknowledgment ack) throws JsonProcessingException {
作为恢复方法的一部分,我想获取作为输入传递给侦听器映射的 conciseMap 或我的主题接收到的原始消息。有办法吗?
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(conncurrency);
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
// TODO Auto-generated method stub
logger.debug(" In recovery callback method !!");
((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge();
return null;
}
});
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
/*
* Retry template.
*/
protected RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
return policy;
}
protected BackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(initialRetryInterval);
policy.setMultiplier(retryMultiplier);
return policy;
}
protected RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(retryPolicy());
template.setBackOffPolicy(backOffPolicy());
return template;
}
}
您无法在 RecoveryCallback
的 RetryContext
中获取转换后的 conciseMap
,但您可以检索 ConsumerRecord
,这是之前主题中的原始内容转换:
(ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD)