通过 Kafka Consumer 消费消息时出错

Error while consuming messages via Kafka Consumer

我是 Apache Kafka 的新手。我试图找出解决此问题的方法,但失败了。 生产者代码运行良好,数据存储在 Kafka 主题中。 我在 Kafka Producer 配置中使用 JsonSerializer 作为 ValueSerializer。

NoramlizedEvent 是一个简单的 POJO,用于生产者和消费者。

我的制作人代码:

    public void saveMessage(final IMMessage message) {

        for (NormalizedEvent event :
                message.getNormalizedEvents()) {

            event.setServiceId(message.getServiceId());

            ProducerRecord<String, NormalizedEvent> producerRecord = buildProducerRecord(null, event, TOPIC);
            ListenableFuture<SendResult<String, NormalizedEvent>> listenableFuture = kafkaTemplate.send(producerRecord);
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, NormalizedEvent>>() {

                @Override
                public void onFailure(Throwable ex) {
                    handleFailure(null, event, ex);
                }

                @Override
                public void onSuccess(SendResult<String, NormalizedEvent> result) {
                    properties.put("partitionId", result.getRecordMetadata().partition());
                    properties.put("offsetId", result.getRecordMetadata().offset());
                    handleSuccess(null, event, result);
                    imMessageDBService.setDBProperties(event, properties);
                }
            });

        }
    }

    private ProducerRecord<String, NormalizedEvent> buildProducerRecord(String key, NormalizedEvent value, String topic) {
        return new ProducerRecord<>(topic, key, value);
    }

我的消费者代码:

 @Bean
    public ConsumerFactory<String, NormalizedEvent> userConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(NormalizedEvent.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> userKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }

错误信息:

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) [spring-kafka-2.7.0.jar:2.7.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition SAP-S_4-HANA-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.0.jar:2.7.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]

Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all ().

看起来事件在生产者和消费者中处于不同的包中。

默认情况下,反序列化器使用 headers 中的类型信息,如果没有类型 headers,则泛型参数用作后备。

解串器上有两种解决方法:

/**
 * Set to false to ignore type information in headers and use the configured
 * target type instead.
 * Only applies if the preconfigured type mapper is used.
 * Default true.
 * @param useTypeHeaders false to ignore type headers.
 * @since 2.2.8
 */
public void setUseTypeHeaders(boolean useTypeHeaders) {
    if (!this.typeMapperExplicitlySet) {
        this.useTypeHeaders = useTypeHeaders;
        setUpTypePrecedence(Collections.emptyMap());
    }
}

或在序列化器上:

/**
 * Set to false to disable adding type info headers.
 * @param addTypeInfo true to add headers.
 * @since 2.1
 */
public void setAddTypeInfo(boolean addTypeInfo) {
    this.addTypeInfo = addTypeInfo;
}

参见 the documentation 和 Javadoc。