如何使用 Spring-Boot 在有状态应用程序中配置 DLQ?

How to configure DLQ in a stateful application with Spring-Boot?

我需要创建一个使用有状态重试的应用程序,它监听 Kafka 主题并调用一些 API,然后提交消息。如果其中一个调用发生错误,例如超时,应用程序必须重试 4 次,间隔为 4 秒。在这四次尝试结束时,如果仍然没有成功,应用程序应将其发送到 DLQ 主题。

发送到DLQ主题的部分我做不到。因为当我尝试配置 DLQ 时,重试不会停止,也不会发送到 DLQ。

@KafkaListener(topics = "${topic.name}", concurrency = "1")
public void listen(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem,
                    @Headers final MessageHeaders headers,
                    Acknowledgment ack) {
    AberturaContaLimiteCreditoCalculadoData dados;
    if (!validarMensagem(mensagem)) {
        dados = mensagem.value().getData();
        throw new RuntimeException();
        //This throw Runtime it's just to force it to retry.

private boolean validarMensagem(ConsumerRecord<String, AberturaContaLimiteCreditoCalculado> mensagem) {
    return mensagem == null || mensagem.value() == null;


public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(final ConsumerFactory<String, Object> consumerFactory) {
    final ConcurrentKafkaListenerContainerFactory<String, Object> factory
            = new ConcurrentKafkaListenerContainerFactory();
    factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(4000L, 4L)));
    return factory;

public DeadLetterPublishingRecoverer publisherRetryDLQ() {
    return new DeadLetterPublishingRecoverer(createKafkaTemplate(),
            (record, ex) -> new TopicPartition(topicoDLQ, 0));

public ProducerFactory<String, String> producerFactory() {
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    return new DefaultKafkaProducerFactory<>(config);

public KafkaOperations<String, String> createKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());

编辑 2022-05-04:

根据您对 RetryListener 的提示和 logging.level 对调试的提示,我们设法找到了未构建 Producer 的问题。

现在的问题是我们收到了一个与 DLQ avro 不同的 avro 消费者。区别在于DLQ有一个额外的字段,必须存储错误的原因。

2022/05/04 16:53:43.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [ERROR] o.s.k.l.DeadLetterPublishingRecoverer - Dead-letter publication to limites-abertura-conta-limite-credito-calculado-convivenciaaberturaconta-dlq failed for: limites-abertura-conta-limite-credito-calculado-0@6
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{...}]}}]}


如果我正确理解问题,您想创建一个具有不同值类型的 ProducerRecord

只需继承 DLPR 并覆盖 createProducerRecord().

     * Subclasses can override this method to customize the producer record to send to the
     * DLQ. The default implementation simply copies the key and value from the consumer
     * record and adds the headers. The timestamp is not set (the original timestamp is in
     * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
     * less than 0, it must be set to null in the {@link ProducerRecord}.
     * @param record the failed record
     * @param topicPartition the {@link TopicPartition} returned by the destination
     * resolver.
     * @param headers the headers - original record headers plus DLT headers.
     * @param key the key to use instead of the consumer record key.
     * @param value the value to use instead of the consumer record value.
     * @return the producer record to send.
     * @see KafkaHeaders
    protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
            TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {

您可以检查 headers 以确定导致失败的异常。如果您需要实际的异常,请覆盖 accept() 以在 ThreadLocal 中捕获它,然后调用 super.accept();然后,您可以在 createProducerRecord().


同一个producer factory发布不同types的解决方案有多种。