处理毒丸的不同方法

Different way of handling poison pill

我正在尝试使用 spring-kafka 处理毒丸场景。 目前我正在用下面的方法处理这个问题,这里失败的消息被推送到另一个名为 <original-topic>.DLT.

的主题
  @Bean
  public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
  }
  
  @Bean
  public DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
    return new DeadLetterPublishingRecoverer(bytesTemplate);
  }

所以我不想将失败的消息推送到 <original-topic>.DLT 主题,而是想获取它并将其直接推送到数据库中。 我试图获取失败的消息但没有成功。有人可以帮忙吗? TIA.

只需实现您自己的 ConsumerRecordRecoverer 并在错误处理程序中使用它而不是 DeadLetterPublishingRecoverer

/**
 * A {@link BiConsumer} extension for recovering consumer records.
 *
 * @author Gary Russell
 * @since 2.3
 *
 */
@FunctionalInterface
public interface ConsumerRecordRecoverer extends BiConsumer<ConsumerRecord<?, ?>, Exception> {

}