通过 Kafka 发送/接收 Java 个对象

Send / Receive Java Objects through Kafka

我想通过 Kafka 发送 Java 对象并接收它们作为 Java 对象。我认为 ProducerFactory 和 ConsumerFactory 的以下配置就足够了,但我收到的有效负载为 org.apache.kafka.clients.consumer.ConsumerRecord。我需要在 ConsumerFactory 中更改什么?

消费者配置:

@Configuration
public class KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public ConsumerFactory<String, Object> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);

    deserializer.addTrustedPackages("*");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory("headers"));
  return factory;
}

}

生产者配置:

@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

}

如您所述,您可以使用消费者记录的值获取 java 对象。

 @component
 public class Listener {

  @kafkaListener
  void listen(ConsumerRecord<String, Car> record) {
  Car car = record.value();
  }
 }

通过访问 record.value()。您可以获得相关的 java 对象,因为您必须在适当的位置进行反序列化。

I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord

您是否看过 class 的 Javadoc 并注意到 .value() 方法?

FWIW、Avro 或 Protobuf 序列化程序在发送对象方面比 JSON

更有效