序列化 Kafka Avro 对象时,我得到 "org.apache.kafka.common.errors.SerializationException: Unknown magic byte!"
When serializing a Kafka Avro object I then get "org.apache.kafka.common.errors.SerializationException: Unknown magic byte!"
我有一个 Java Spring 应用程序使用 Kafka 来消费 Avro 消息。
在我的测试中,我将一个 Avro 对象序列化为 byte[]
并将其与 KafkaTemplate<String, byte[]>
一起发送到主题。
这是我序列化对象的方式:
private static byte[] serialize(Product product) {
var specificDatumWriter = new SpecificDatumWriter<>(Product.class);
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
var binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream, null);
specificDatumWriter.write(product, binaryEncoder);
binaryEncoder.flush();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
kafkaTemplate.send("products", uuid.toString(), serialize(product));
我按照这个例子来序列化对象:https://www.baeldung.com/java-apache-avro#1-serialization
但是,当我的 @KafkaListener
尝试反序列化消息时收到此错误消息:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2060)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2045)
...
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserializationException(ErrorHandlingDeserializer.java:216)
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:191)
...
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1232)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1127)
... 4 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我错过了什么?我在序列化方面做错了什么?
我现在已经开始工作了。我必须使用 KafkaAvroSerializer
来序列化对象。
private byte[] serialize(Product product) {
try (var kafkaAvroSerializer = new KafkaAvroSerializer()) {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);
kafkaAvroSerializer.configure(props, false);
return kafkaAvroSerializer.serialize("products", product);
}
}
另请参阅:
我有一个 Java Spring 应用程序使用 Kafka 来消费 Avro 消息。
在我的测试中,我将一个 Avro 对象序列化为 byte[]
并将其与 KafkaTemplate<String, byte[]>
一起发送到主题。
这是我序列化对象的方式:
private static byte[] serialize(Product product) {
var specificDatumWriter = new SpecificDatumWriter<>(Product.class);
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
var binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream, null);
specificDatumWriter.write(product, binaryEncoder);
binaryEncoder.flush();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
kafkaTemplate.send("products", uuid.toString(), serialize(product));
我按照这个例子来序列化对象:https://www.baeldung.com/java-apache-avro#1-serialization
但是,当我的 @KafkaListener
尝试反序列化消息时收到此错误消息:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2060)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2045)
...
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserializationException(ErrorHandlingDeserializer.java:216)
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:191)
...
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1232)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1127)
... 4 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我错过了什么?我在序列化方面做错了什么?
我现在已经开始工作了。我必须使用 KafkaAvroSerializer
来序列化对象。
private byte[] serialize(Product product) {
try (var kafkaAvroSerializer = new KafkaAvroSerializer()) {
Map<String, Object> props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);
kafkaAvroSerializer.configure(props, false);
return kafkaAvroSerializer.serialize("products", product);
}
}
另请参阅: