Spring boot Kafka 在不同微服务之间发送 objects

Spring boot Kafka send objects between different microservices

这是我的生产者微服务kafka配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapService;

    //producer factory
    @Bean
    public ProducerFactory<String, Object> producerFactory (){
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapService);
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configMap);
    }

    //inviare messaggi
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

我这样发送消息:

kafkaTemplate.send(TOPIC_NAME, message);

我对制作人没有意见,

这是消费者微服务kafka配置:

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        return Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
                ConsumerConfig.GROUP_ID_CONFIG, "testId"
        );
    }
}

使用相同的配置,但使用 String 代替 Object 或代替我的自定义 object 我得到以下堆栈跟踪:

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1763) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1286) ~[spring-kafka-2.8.3.jar:2.8.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition kafka-topic-2-0 at offset 25. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1500) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 3 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.kafkaproducer.model.Message]; nested exception is java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.0.jar:na]
    ... 15 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.boot.devtools.restart.classloader.RestartClassLoader.loadClass(RestartClassLoader.java:145) ~[spring-boot-devtools-2.6.4.jar:2.6.4]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 18 common frames omitted

对于在不同的服务中使用生产者和消费者,您有什么建议吗?

[编辑] 我正在添加监听器,因为它被问到:

@KafkaListener(topics = TOPIC_NAME, groupId = "testId")
public void listener(@Payload Message rcvMessage){

    log.info("message: {}", rcvMessage);

}

请注意,消息 class 在两个项目中使用相同的参数定义。

[编辑 2] 它现在可以工作了,我像这样删除了反序列化器中的 headers:

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
    JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class, false);
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

这是可以接受的还是只是一种解决方法?

当您向 kafka 发送消息时,header 会随消息一起发送。在 header 中,默认情况下,是自定义 Object.

的完整地址

例如:

 com.example.kafkaproducer.model.Message

消息的消费者端 class 必须在与生产者相同的路径中创建。因为在consumer端接收消息的时候进行校验,所以如果指定了path以外的路径,会收到如下错误。

原因:java.lang.ClassNotFoundException:com.example.kafkaproducer.model.Message

最有可能的是,如果您更改消费者端的消息 class 路径,您的错误将得到修复。