有没有办法在 spring-kafka 中配置自定义 RecordMessageConverter?

Is there a way to configure custom RecordMessageConverter in spring-kafka?

我创建了一个自定义 RecordMessageConverter,它在 toMessagefromMessage 中打印(我按照 https://www.confluent.io/blog/spring-for-apache-kafka-deep-dive-part-1-error-handling-message-conversion-transaction-support 中的说明创建了 Bean),但不幸的是它没有绑定到听众。我可以显式绑定它还是我应该做其他事情让它起作用?

public class MobileMessageConverter implements RecordMessageConverter {
    @Override
    public Message<?> toMessage(ConsumerRecord<?,?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type payloadType) {
        System.out.println("Converting toMessage");
        return null;
    }

    @Override
    public ProducerRecord<CustomKey, CustomValue> fromMessage(Message<?> message, String defaultTopic) {
        System.out.println("Converting fromMessage");
        return new ProducerRecord("UNKNOWN",new CustomValue());
    }
}


@RestController
public class CustomKafkaController {
    @Bean
    public RecordMessageConverter converter() {
        return new CustomMessageConverter();
    }

    @KafkaListener(topics = "UNKNOWN", containerFactory = "kafkaListenerContainerFactory", groupId = "1")
    public void listenAsObject(ConsumerRecord<CustomKey, CustomValue> cr) {
        System.out.println(cr.getValue().getCustomMessage());
    }
}

我解决了!

  1. 我通过 setter.
  2. 将转换器添加到工厂 Bean (kafkaListenerContainerFactory)
  3. 我更改了转换以实现 MessagingMessageConverter 而不是 RecordMessageConverter 并且我实现了方法 extractAndConvertValue
  4. (如果您想更改为其他类型) 删除 listenAsObject 并将其替换为 ConsumerRecordtype 您的转换为

例如:
如果要将自定义值更改为字符串:
创建新转换器:

public class CustomToStringMessageConverter extends MessagingMessageConverter {
    @Override
    protected String extractAndConvertValue(ConsumerRecord<?, ?> record, Type type){
       return "EmptyString"
    }
}

将侦听器更改为:

@KafkaListener(topics = "UNKNOWN", containerFactory = "kafkaListenerStringContainerFactory", groupId = "1")
public void listen2(String cr) {
    **DO SOMETHING WITH THE CONVERTED CR**
}

最后一件事是通过以下方式获取转换后的 Bean 并将其设置在您的工厂中:

factory.setMessageConverter(new CustomToStringMessageConverter ());