如果消息是 GenericRecord,如何在 KafkaListener 方法中提取消息头详细信息

How to pull Message header details in KafkaListener method if message is a GenericRecord

我希望能够从 avro 模式类型的消息中获取 record.Key。

我有我的 kafkaConsumerConfig class,它设置属性并包含 kafkalistener contrainerfactory

    public ConsumerFactory<String, GenericRecord> kafkaConsumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);  
        ...;
        
        return new DefaultKafkaConsumerFactory<>(properties);
    }
    
    //Used to poll for messages
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory() 
    { 
        
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(kafkaConsumerFactory());
      return factory;
    }

这是我的听众class:

@KafkaListener(topics = "topicName", groupId = "groupIDName")
public class KafkaListenerclass {
    
    Logger logger = LoggerFactory.getLogger(KafkaListenerclass.class);
    
    @KafkaHandler
      void listenerGenericRecord(GenericRecord record) {
        logger.info("KafkaHandler[Default] {}", record);
        System.out.println("Im here generic listener..."+ record.toString());
      }
    
    @KafkaHandler
    public void listenPayload(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
         System.out.println("Im here listenPayload..."+ messageKey);
    }

它总是使用我似乎无法从中提取其他消息详细信息的 listenerGEnericRecord()。
当我删除此方法以查看它是否属于使用 listenPayload() 时,我得到:

侦听器失败;嵌套异常是 org.springframework.kafka.KafkaException:找不到 class org.apache.avro.generic.GenericData$Record

的方法

除了重写不使用此 bean 结构的代码外,我似乎找不到任何帮助

这样怎么样:

void listenerGenericRecord(GenericRecord record,
           @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {

GenericRecord 是 Avro 数据的 POJO,仅存储在 Kafka 记录 body 中。 ConsumerRecord 的 headers 与 Avro 完全无关。因此,如果 Kafka 记录 headers 中存储了一些元数据,则应将它们与仅存储在 body.

中的数据区分开来