重新排队kafka主题中的失败记录

Requeue the failed record in the kafka topic

我有一个用例,其中记录要保存在 table 中,它本身有外键。

示例:

z对象 { uid, 名称, parentuid }

parent uid 也出现在同一个 table 中,任何 object 不存在的 parentuid 将无法持久化。

有时记录在主题中的放置方式使得依赖关系不在列表的开头,而是在依赖记录出现之后

这将导致处理记录失败。我使用了 seektocurrenterrorhandler,它实际上为给定的退避重试相同的失败记录,但由于不满足依赖关系而失败。

有什么方法可以在主题末尾重新排队记录以满足依赖性?如果在入队后仍然失败了 5 天,则可以将记录推送到 DLT。

谢谢, 拉贾塞卡

没有内置任何东西;但是,您可以根据失败记录中的 header,在 DeadLetterPublishingRecoverer 中使用自定义目标解析器来确定要发布到哪个主题。

https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#dead-letters

编辑

@SpringBootApplication
public class So64646996Application {

    public static void main(String[] args) {
        SpringApplication.run(So64646996Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so64646996").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name("so64646996.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    public ErrorHandler eh(KafkaOperations<String, String> template) {
        return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
                (rec, ex) -> {
                    org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                    if (retries == null) {
                        retries = new RecordHeader("retries", new byte[] { 1 });
                        rec.headers().add(retries);
                    }
                    else {
                        retries.value()[0]++;
                    }
                    return retries.value()[0] > 5
                            ? new TopicPartition("so64646996.DLT", rec.partition())
                            : new TopicPartition("so64646996", rec.partition());
                }), new FixedBackOff(0L, 0L));
    }


    @KafkaListener(id = "so64646996", topics = "so64646996")
    public void listen(String in,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(name = "retries", required = false) byte[] retry) {

        System.out.println(in + "@" + offset + ":" + retry[0]);
        throw new IllegalStateException();
    }

    @KafkaListener(id = "so64646996.DLT", topics = "so64646996.DLT")
    public void listenDLT(String in,
            @Header(KafkaHeaders.OFFSET) long offset,
            @Header(name = "retries", required = false) byte[] retry) {

        System.out.println("DLT: " + in + "@" + offset + ":" + retry[0]);
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> System.out.println(template.send("so64646996", "foo").get(10, TimeUnit.SECONDS)
                .getRecordMetadata());
    }

}