通过 Kafka 发送/接收 Java 个对象
Send / Receive Java Objects through Kafka
我想通过 Kafka 发送 Java 对象并接收它们作为 Java 对象。我认为 ProducerFactory 和 ConsumerFactory 的以下配置就足够了,但我收到的有效负载为 org.apache.kafka.clients.consumer.ConsumerRecord。我需要在 ConsumerFactory 中更改什么?
消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, Object> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);
deserializer.addTrustedPackages("*");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
}
}
生产者配置:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
如您所述,您可以使用消费者记录的值获取 java 对象。
@component
public class Listener {
@kafkaListener
void listen(ConsumerRecord<String, Car> record) {
Car car = record.value();
}
}
通过访问 record.value()。您可以获得相关的 java 对象,因为您必须在适当的位置进行反序列化。
I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord
您是否看过 class 的 Javadoc 并注意到 .value()
方法?
FWIW、Avro 或 Protobuf 序列化程序在发送对象方面比 JSON
更有效
我想通过 Kafka 发送 Java 对象并接收它们作为 Java 对象。我认为 ProducerFactory 和 ConsumerFactory 的以下配置就足够了,但我收到的有效负载为 org.apache.kafka.clients.consumer.ConsumerRecord。我需要在 ConsumerFactory 中更改什么?
消费者配置:
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, Object> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);
deserializer.addTrustedPackages("*");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
}
}
生产者配置:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
如您所述,您可以使用消费者记录的值获取 java 对象。
@component
public class Listener {
@kafkaListener
void listen(ConsumerRecord<String, Car> record) {
Car car = record.value();
}
}
通过访问 record.value()。您可以获得相关的 java 对象,因为您必须在适当的位置进行反序列化。
I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord
您是否看过 class 的 Javadoc 并注意到 .value()
方法?
FWIW、Avro 或 Protobuf 序列化程序在发送对象方面比 JSON
更有效