如何使用 Spring-Boot 在有状态应用程序中配置 DLQ?
How to configure DLQ in a stateful application with Spring-Boot?
我需要创建一个使用有状态重试的应用程序,它监听 Kafka 主题并调用一些 API,然后提交消息。如果其中一个调用发生错误,例如超时,应用程序必须重试 4 次,间隔为 4 秒。在这四次尝试结束时,如果仍然没有成功,应用程序应将其发送到 DLQ 主题。
发送到DLQ主题的部分我做不到。因为当我尝试配置 DLQ 时,重试不会停止,也不会发送到 DLQ。
@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
@Headers final MessageHeaders headers,
Acknowledgment ack) {
AberturaContaLimiteCreditoCalculadoData dados;
if (!validarMensagem(mensagem)) {
dados = mensagem.value().getData();
throw new RuntimeException();
//ack.acknowledge();
//This throw Runtime it's just to force it to retry.
}
}
private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
return mensagem == null || mensagem.value() == null;
}
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setCommonErrorHandler(new DefaultErrorHandler(
publisherRetryDLQ(),
new FixedBackOff(4000L, 4L)));
return factory;
}
public DeadLetterPublishingRecoverer publisherRetryDLQ() {
return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
(record, ex) -> new TopicPartition(topicoDLQ, 0));
}
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
}
public KafkaOperations<String, String> createKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
编辑 2022-05-04:
根据您对 RetryListener 的提示和 logging.level 对调试的提示,我们设法找到了未构建 Producer 的问题。
现在的问题是我们收到了一个与 DLQ avro 不同的 avro 消费者。区别在于DLQ有一个额外的字段,必须存储错误的原因。
2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}
有没有办法进行这种转换?
如果我正确理解问题,您想创建一个具有不同值类型的 ProducerRecord
。
只需继承 DLPR 并覆盖 createProducerRecord()
.
/**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
您可以检查 headers 以确定导致失败的异常。如果您需要实际的异常,请覆盖 accept()
以在 ThreadLocal
中捕获它,然后调用 super.accept()
;然后,您可以在 createProducerRecord()
.
中使用本地线程
同一个producer factory发布不同types的解决方案有多种。
- 你可以给我们一个
DelegatingByTypeSerializer
看 KafkaConsumer With Multiple Different Avro Producers And Transactions 的例子
- 您可以使用使用不同序列化程序的
KafkaTemplate
配置发布者(它有一个构造函数,您可以在其中覆盖生产者工厂配置)。
我需要创建一个使用有状态重试的应用程序,它监听 Kafka 主题并调用一些 API,然后提交消息。如果其中一个调用发生错误,例如超时,应用程序必须重试 4 次,间隔为 4 秒。在这四次尝试结束时,如果仍然没有成功,应用程序应将其发送到 DLQ 主题。
发送到DLQ主题的部分我做不到。因为当我尝试配置 DLQ 时,重试不会停止,也不会发送到 DLQ。
@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
@Headers final MessageHeaders headers,
Acknowledgment ack) {
AberturaContaLimiteCreditoCalculadoData dados;
if (!validarMensagem(mensagem)) {
dados = mensagem.value().getData();
throw new RuntimeException();
//ack.acknowledge();
//This throw Runtime it's just to force it to retry.
}
}
private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
return mensagem == null || mensagem.value() == null;
}
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
final ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setCommonErrorHandler(new DefaultErrorHandler(
publisherRetryDLQ(),
new FixedBackOff(4000L, 4L)));
return factory;
}
public DeadLetterPublishingRecoverer publisherRetryDLQ() {
return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
(record, ex) -> new TopicPartition(topicoDLQ, 0));
}
public ProducerFactory<String, String> producerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(config);
}
public KafkaOperations<String, String> createKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
编辑 2022-05-04:
根据您对 RetryListener 的提示和 logging.level 对调试的提示,我们设法找到了未构建 Producer 的问题。
现在的问题是我们收到了一个与 DLQ avro 不同的 avro 消费者。区别在于DLQ有一个额外的字段,必须存储错误的原因。
2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}
有没有办法进行这种转换?
如果我正确理解问题,您想创建一个具有不同值类型的 ProducerRecord
。
只需继承 DLPR 并覆盖 createProducerRecord()
.
/**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
您可以检查 headers 以确定导致失败的异常。如果您需要实际的异常,请覆盖 accept()
以在 ThreadLocal
中捕获它,然后调用 super.accept()
;然后,您可以在 createProducerRecord()
.
同一个producer factory发布不同types的解决方案有多种。
- 你可以给我们一个
DelegatingByTypeSerializer
看 KafkaConsumer With Multiple Different Avro Producers And Transactions 的例子 - 您可以使用使用不同序列化程序的
KafkaTemplate
配置发布者(它有一个构造函数,您可以在其中覆盖生产者工厂配置)。