kafka反序列化异常
Deserialization exception in kafka
我在使用 kafka-streams 时遇到以下错误。
[Kafka Stream] 10:09:26.442 ERROR --- o.a.k.s.e.LogAndFailExceptionHandler: Exception caught
during Deserialization, taskId: 0_0, topic: t.commodity.promotion, partition: 0, offset: 0
java.lang.IllegalArgumentException: The class'com.course.kafka.kafkaorder.Broker.Message.PromotionMessage'
is not in the trusted packages: [java.util, java.lang, com.course.stream.broker.message,
com.course.stream.broker.message.*]. If you believe this class is safe to deserialize,
please provide its name. If the serialization is only done by a trusted source, you can also enable
trust all (*).
[Kafka Stream] 10:09:26.444 ERROR --- o.a.k.s.p.internals.StreamThread: stream-thread [kafka-stream-7972
450a-443b-8b7b-007e9fdf8e4c-StreamThread-1] Encountered the following exception during
processing and the thread is going to shut down:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to
fail upon a deserialization error. If you would rather have the streaming pipeline continue after a
deserialization error, please set the default.deserialization.exception.handler appropriately.
我的密码是
@Configuration
public class PromotionJsonSerde {
@Bean
public KStream<String, PromotionMessage> kStreamPromotionUppercase(StreamsBuilder builder)
{
var stringSerde = Serdes.String();
var jsonSerde = new JsonSerde<>(PromotionMessage.class);
KStream<String, PromotionMessage> sourceStream = builder.stream("t.commodity.promotion",
Consumed.with(stringSerde, jsonSerde));
KStream<String, PromotionMessage> uppercaseStream = sourceStream.mapValues(this::uppercasePromotionCode);
uppercaseStream.to("t.commodity.promotion-uppercase", Produced.with(stringSerde, jsonSerde));
return sourceStream;
}
private PromotionMessage uppercasePromotionCode(PromotionMessage message)
{
return new PromotionMessage(message.getPromotionCode().toUpperCase());
}
}
促销代码
public class PromotionMessage {
private String promotionCode;
// getters,setters, tostring
}
application.yml
logging:
pattern:
console: "[Kafka Stream] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"
spring:
main:
banner-mode: OFF
kafka:
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
看到错误,我尝试在 application.yml 文件中添加以下代码。但我还是得到了
同样的错误。
spring:
kafka:
producer:
streams:
properties:
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler,org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler
consumer:
properties:
spring:
json:
trusted:
packages: "*"
cloud:
stream:
kafka:
streams:
bindings:
process-in-0.consumer:
deserializationExceptionHandler: logAndContinue
为什么我会收到反序列化错误?
如果我在终端上检查消费者,我会收到生产者发送的消息。
谁能帮帮我?
我看到您正在自己创建 serde new JsonSerde<>(PromotionMessage.class);
- 我们会自动将 class' 包添加到受信任的包中;因此
trusted packages: [java.util, java.lang, com.course.stream.broker.message, com.course.stream.broker.message.*]
当您创建自己的 serde 时,属性 将被忽略。反序列化器正在尝试创建位于不同包中的 com.course.kafka.kafkaorder.Broker.Message.PromotionMessage
;生产者很可能 class 不同。
添加:((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false);
告诉反序列化器忽略 headers 中的类型信息并改用提供的回退类型。
我在使用 kafka-streams 时遇到以下错误。
[Kafka Stream] 10:09:26.442 ERROR --- o.a.k.s.e.LogAndFailExceptionHandler: Exception caught
during Deserialization, taskId: 0_0, topic: t.commodity.promotion, partition: 0, offset: 0
java.lang.IllegalArgumentException: The class'com.course.kafka.kafkaorder.Broker.Message.PromotionMessage'
is not in the trusted packages: [java.util, java.lang, com.course.stream.broker.message,
com.course.stream.broker.message.*]. If you believe this class is safe to deserialize,
please provide its name. If the serialization is only done by a trusted source, you can also enable
trust all (*).
[Kafka Stream] 10:09:26.444 ERROR --- o.a.k.s.p.internals.StreamThread: stream-thread [kafka-stream-7972
450a-443b-8b7b-007e9fdf8e4c-StreamThread-1] Encountered the following exception during
processing and the thread is going to shut down:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to
fail upon a deserialization error. If you would rather have the streaming pipeline continue after a
deserialization error, please set the default.deserialization.exception.handler appropriately.
我的密码是
@Configuration
public class PromotionJsonSerde {
@Bean
public KStream<String, PromotionMessage> kStreamPromotionUppercase(StreamsBuilder builder)
{
var stringSerde = Serdes.String();
var jsonSerde = new JsonSerde<>(PromotionMessage.class);
KStream<String, PromotionMessage> sourceStream = builder.stream("t.commodity.promotion",
Consumed.with(stringSerde, jsonSerde));
KStream<String, PromotionMessage> uppercaseStream = sourceStream.mapValues(this::uppercasePromotionCode);
uppercaseStream.to("t.commodity.promotion-uppercase", Produced.with(stringSerde, jsonSerde));
return sourceStream;
}
private PromotionMessage uppercasePromotionCode(PromotionMessage message)
{
return new PromotionMessage(message.getPromotionCode().toUpperCase());
}
}
促销代码
public class PromotionMessage {
private String promotionCode;
// getters,setters, tostring
}
application.yml
logging:
pattern:
console: "[Kafka Stream] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"
spring:
main:
banner-mode: OFF
kafka:
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
看到错误,我尝试在 application.yml 文件中添加以下代码。但我还是得到了
同样的错误。
spring:
kafka:
producer:
streams:
properties:
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler,org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler
consumer:
properties:
spring:
json:
trusted:
packages: "*"
cloud:
stream:
kafka:
streams:
bindings:
process-in-0.consumer:
deserializationExceptionHandler: logAndContinue
为什么我会收到反序列化错误?
如果我在终端上检查消费者,我会收到生产者发送的消息。
谁能帮帮我?
我看到您正在自己创建 serde new JsonSerde<>(PromotionMessage.class);
- 我们会自动将 class' 包添加到受信任的包中;因此
trusted packages: [java.util, java.lang, com.course.stream.broker.message, com.course.stream.broker.message.*]
当您创建自己的 serde 时,属性 将被忽略。反序列化器正在尝试创建位于不同包中的 com.course.kafka.kafkaorder.Broker.Message.PromotionMessage
;生产者很可能 class 不同。
添加:((JsonDeserializer) jsonSerde.deserializer()).setUseTypeHeaders(false);
告诉反序列化器忽略 headers 中的类型信息并改用提供的回退类型。