处理毒丸的不同方法
Different way of handling poison pill
我正在尝试使用 spring-kafka 处理毒丸场景。
目前我正在用下面的方法处理这个问题,这里失败的消息被推送到另一个名为 <original-topic>.DLT
.
的主题
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
所以我不想将失败的消息推送到 <original-topic>.DLT
主题,而是想获取它并将其直接推送到数据库中。
我试图获取失败的消息但没有成功。有人可以帮忙吗? TIA.
只需实现您自己的 ConsumerRecordRecoverer
并在错误处理程序中使用它而不是 DeadLetterPublishingRecoverer
。
/**
* A {@link BiConsumer} extension for recovering consumer records.
*
* @author Gary Russell
* @since 2.3
*
*/
@FunctionalInterface
public interface ConsumerRecordRecoverer extends BiConsumer<ConsumerRecord<?, ?>, Exception> {
}
我正在尝试使用 spring-kafka 处理毒丸场景。
目前我正在用下面的方法处理这个问题,这里失败的消息被推送到另一个名为 <original-topic>.DLT
.
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
所以我不想将失败的消息推送到 <original-topic>.DLT
主题,而是想获取它并将其直接推送到数据库中。
我试图获取失败的消息但没有成功。有人可以帮忙吗? TIA.
只需实现您自己的 ConsumerRecordRecoverer
并在错误处理程序中使用它而不是 DeadLetterPublishingRecoverer
。
/**
* A {@link BiConsumer} extension for recovering consumer records.
*
* @author Gary Russell
* @since 2.3
*
*/
@FunctionalInterface
public interface ConsumerRecordRecoverer extends BiConsumer<ConsumerRecord<?, ?>, Exception> {
}