重新排队kafka主题中的失败记录
Requeue the failed record in the kafka topic
我有一个用例,其中记录要保存在 table 中,它本身有外键。
示例:
z对象
{
uid,
名称,
parentuid
}
parent uid 也出现在同一个 table 中,任何 object 不存在的 parentuid 将无法持久化。
有时记录在主题中的放置方式使得依赖关系不在列表的开头,而是在依赖记录出现之后
这将导致处理记录失败。我使用了 seektocurrenterrorhandler,它实际上为给定的退避重试相同的失败记录,但由于不满足依赖关系而失败。
有什么方法可以在主题末尾重新排队记录以满足依赖性?如果在入队后仍然失败了 5 天,则可以将记录推送到 DLT。
谢谢,
拉贾塞卡
没有内置任何东西;但是,您可以根据失败记录中的 header,在 DeadLetterPublishingRecoverer
中使用自定义目标解析器来确定要发布到哪个主题。
见https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#dead-letters
编辑
@SpringBootApplication
public class So64646996Application {
public static void main(String[] args) {
SpringApplication.run(So64646996Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so64646996").partitions(1).replicas(1).build();
}
@Bean
public NewTopic dlt() {
return TopicBuilder.name("so64646996.DLT").partitions(1).replicas(1).build();
}
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("so64646996.DLT", rec.partition())
: new TopicPartition("so64646996", rec.partition());
}), new FixedBackOff(0L, 0L));
}
@KafkaListener(id = "so64646996", topics = "so64646996")
public void listen(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {
System.out.println(in + "@" + offset + ":" + retry[0]);
throw new IllegalStateException();
}
@KafkaListener(id = "so64646996.DLT", topics = "so64646996.DLT")
public void listenDLT(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {
System.out.println("DLT: " + in + "@" + offset + ":" + retry[0]);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> System.out.println(template.send("so64646996", "foo").get(10, TimeUnit.SECONDS)
.getRecordMetadata());
}
}
我有一个用例,其中记录要保存在 table 中,它本身有外键。
示例:
z对象 { uid, 名称, parentuid }
parent uid 也出现在同一个 table 中,任何 object 不存在的 parentuid 将无法持久化。
有时记录在主题中的放置方式使得依赖关系不在列表的开头,而是在依赖记录出现之后
这将导致处理记录失败。我使用了 seektocurrenterrorhandler,它实际上为给定的退避重试相同的失败记录,但由于不满足依赖关系而失败。
有什么方法可以在主题末尾重新排队记录以满足依赖性?如果在入队后仍然失败了 5 天,则可以将记录推送到 DLT。
谢谢, 拉贾塞卡
没有内置任何东西;但是,您可以根据失败记录中的 header,在 DeadLetterPublishingRecoverer
中使用自定义目标解析器来确定要发布到哪个主题。
见https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#dead-letters
编辑
@SpringBootApplication
public class So64646996Application {
public static void main(String[] args) {
SpringApplication.run(So64646996Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so64646996").partitions(1).replicas(1).build();
}
@Bean
public NewTopic dlt() {
return TopicBuilder.name("so64646996.DLT").partitions(1).replicas(1).build();
}
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("so64646996.DLT", rec.partition())
: new TopicPartition("so64646996", rec.partition());
}), new FixedBackOff(0L, 0L));
}
@KafkaListener(id = "so64646996", topics = "so64646996")
public void listen(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {
System.out.println(in + "@" + offset + ":" + retry[0]);
throw new IllegalStateException();
}
@KafkaListener(id = "so64646996.DLT", topics = "so64646996.DLT")
public void listenDLT(String in,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "retries", required = false) byte[] retry) {
System.out.println("DLT: " + in + "@" + offset + ":" + retry[0]);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> System.out.println(template.send("so64646996", "foo").get(10, TimeUnit.SECONDS)
.getRecordMetadata());
}
}