2.5.4版本如何发布Spring Kafka DLQ
How to publish Spring Kafka DLQ in 2.5.4 version
需要您的帮助和指导。
我在当前项目中使用的是 2.2.X 版本 spring-kafka。
我创建的错误处理如下所示:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
然后我升级了我所有的项目依赖版本,比如spring-boot和spring-kafka到最新版本:2.5.4 RELEASE
我发现一些方法已被弃用和更改。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
我的问题是,
如何使用这些配置生成 DLQ:
已编辑
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConcurrencyCount);
factory.setErrorHandler(errorHandler());
return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
deadLetterPublishingRecoverer(),
new FixedBackOff(0L, 2L)
);
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
getEventKafkaTemplate(),
(record, ex) -> {
if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
return new TopicPartition("topic-undelivered", -1);
}
return new TopicPartition("topic-fail", -1);
});
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
感谢 Gary,此配置有效!
提前致谢
不清楚你的意思
The problem is, in the documentation, it's still using the old method, which is deprecated for 2.5.X version
KafkaOperations
是KafkaTemplate
实现的接口;您需要做的唯一更改是将 maxAttempts
更改为 BackOff
...
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
需要您的帮助和指导。
我在当前项目中使用的是 2.2.X 版本 spring-kafka。
我创建的错误处理如下所示:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
然后我升级了我所有的项目依赖版本,比如spring-boot和spring-kafka到最新版本:2.5.4 RELEASE
我发现一些方法已被弃用和更改。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
我的问题是, 如何使用这些配置生成 DLQ:
已编辑
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConcurrencyCount);
factory.setErrorHandler(errorHandler());
return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
deadLetterPublishingRecoverer(),
new FixedBackOff(0L, 2L)
);
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
getEventKafkaTemplate(),
(record, ex) -> {
if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
return new TopicPartition("topic-undelivered", -1);
}
return new TopicPartition("topic-fail", -1);
});
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
感谢 Gary,此配置有效!
提前致谢
不清楚你的意思
The problem is, in the documentation, it's still using the old method, which is deprecated for 2.5.X version
KafkaOperations
是KafkaTemplate
实现的接口;您需要做的唯一更改是将 maxAttempts
更改为 BackOff
...
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}