如果消息是 GenericRecord,如何在 KafkaListener 方法中提取消息头详细信息
How to pull Message header details in KafkaListener method if message is a GenericRecord
我希望能够从 avro 模式类型的消息中获取 record.Key。
我有我的 kafkaConsumerConfig class,它设置属性并包含 kafkalistener contrainerfactory
public ConsumerFactory<String, GenericRecord> kafkaConsumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);
...;
return new DefaultKafkaConsumerFactory<>(properties);
}
//Used to poll for messages
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
这是我的听众class:
@KafkaListener(topics = "topicName", groupId = "groupIDName")
public class KafkaListenerclass {
Logger logger = LoggerFactory.getLogger(KafkaListenerclass.class);
@KafkaHandler
void listenerGenericRecord(GenericRecord record) {
logger.info("KafkaHandler[Default] {}", record);
System.out.println("Im here generic listener..."+ record.toString());
}
@KafkaHandler
public void listenPayload(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
System.out.println("Im here listenPayload..."+ messageKey);
}
它总是使用我似乎无法从中提取其他消息详细信息的 listenerGEnericRecord()。
当我删除此方法以查看它是否属于使用 listenPayload() 时,我得到:
侦听器失败;嵌套异常是 org.springframework.kafka.KafkaException:找不到 class org.apache.avro.generic.GenericData$Record
的方法
除了重写不使用此 bean 结构的代码外,我似乎找不到任何帮助
这样怎么样:
void listenerGenericRecord(GenericRecord record,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
GenericRecord
是 Avro 数据的 POJO,仅存储在 Kafka 记录 body
中。 ConsumerRecord
的 headers 与 Avro 完全无关。因此,如果 Kafka 记录 headers 中存储了一些元数据,则应将它们与仅存储在 body.
中的数据区分开来
我希望能够从 avro 模式类型的消息中获取 record.Key。
我有我的 kafkaConsumerConfig class,它设置属性并包含 kafkalistener contrainerfactory
public ConsumerFactory<String, GenericRecord> kafkaConsumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);
...;
return new DefaultKafkaConsumerFactory<>(properties);
}
//Used to poll for messages
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
这是我的听众class:
@KafkaListener(topics = "topicName", groupId = "groupIDName")
public class KafkaListenerclass {
Logger logger = LoggerFactory.getLogger(KafkaListenerclass.class);
@KafkaHandler
void listenerGenericRecord(GenericRecord record) {
logger.info("KafkaHandler[Default] {}", record);
System.out.println("Im here generic listener..."+ record.toString());
}
@KafkaHandler
public void listenPayload(@Payload String message,@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
System.out.println("Im here listenPayload..."+ messageKey);
}
它总是使用我似乎无法从中提取其他消息详细信息的 listenerGEnericRecord()。
当我删除此方法以查看它是否属于使用 listenPayload() 时,我得到:
侦听器失败;嵌套异常是 org.springframework.kafka.KafkaException:找不到 class org.apache.avro.generic.GenericData$Record
的方法除了重写不使用此 bean 结构的代码外,我似乎找不到任何帮助
这样怎么样:
void listenerGenericRecord(GenericRecord record,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey) {
GenericRecord
是 Avro 数据的 POJO,仅存储在 Kafka 记录 body
中。 ConsumerRecord
的 headers 与 Avro 完全无关。因此,如果 Kafka 记录 headers 中存储了一些元数据,则应将它们与仅存储在 body.