反序列化 Avro 序列化 Kafka Stream 的问题
Problems deserializing an Avro serialized Kafka Stream
我在尝试具体化商店时收到异常。我是 运行 Kafka 1.0、Confluent 的 Schema Registry 4.0 和 Avro 1.8.2。我已经使用 Avro 的 maven 插件生成了 Pojo,并使用 Confluent maven 插件将架构部署到 Confluent 服务器。我能够向 STREAM1 主题生成一条消息。这是设置流的代码:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
properties.put(StreamsConfig.CLIENT_ID_CONFIG, "cleant-id");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
Serde<T> pojoSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
pojoSerde.configure(serdeConfig, false);
Consumed<String, Pojo> consumed = Consumed.with(Serdes.String(), pojoSerde);
KStream<String, Pojo> source = builder.stream(TopicName.STREAM1.toString(), consumed);
KTable<String, Long> storePojoCount = source
.groupBy((key, value) -> key)
.count(Materialized.as(StoreName.STORE_WORD_COUNT.toString()));
Produced<String, Long> produced = Produced.with(Serdes.String(), Serdes.Long());
storePojoCount.toStream().to(TopicName.STREAM2.toString(), produced);
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
产生以下异常。
Exception in thread "cleant-id-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
如何配置此 SpecificAvroSerde 以成功反序列化流?
问题是具体化对象没有适当的反序列化器 - Avro 正在尝试反序列化 KTable 值,因为 Avro 是默认值反序列化器。它无法这样做,因为 KTable 值实际上是 Longs。
使用正确的反序列化器创建物化对象将解决问题。
protected <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> persistentStore(StoreName storeName, Serde<K> keyType, Serde<V> valueType) {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName.toString());
return Materialized.<K, V>as(storeSupplier).withKeySerde(keyType).withValueSerde(valueType);
}
这里可以使用任何商店供应商 - 这正是满足我需要的供应商。
我在尝试具体化商店时收到异常。我是 运行 Kafka 1.0、Confluent 的 Schema Registry 4.0 和 Avro 1.8.2。我已经使用 Avro 的 maven 插件生成了 Pojo,并使用 Confluent maven 插件将架构部署到 Confluent 服务器。我能够向 STREAM1 主题生成一条消息。这是设置流的代码:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
properties.put(StreamsConfig.CLIENT_ID_CONFIG, "cleant-id");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
StreamsBuilder builder = new StreamsBuilder();
Serde<T> pojoSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
pojoSerde.configure(serdeConfig, false);
Consumed<String, Pojo> consumed = Consumed.with(Serdes.String(), pojoSerde);
KStream<String, Pojo> source = builder.stream(TopicName.STREAM1.toString(), consumed);
KTable<String, Long> storePojoCount = source
.groupBy((key, value) -> key)
.count(Materialized.as(StoreName.STORE_WORD_COUNT.toString()));
Produced<String, Long> produced = Produced.with(Serdes.String(), Serdes.Long());
storePojoCount.toStream().to(TopicName.STREAM2.toString(), produced);
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
产生以下异常。
Exception in thread "cleant-id-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
如何配置此 SpecificAvroSerde 以成功反序列化流?
问题是具体化对象没有适当的反序列化器 - Avro 正在尝试反序列化 KTable 值,因为 Avro 是默认值反序列化器。它无法这样做,因为 KTable 值实际上是 Longs。
使用正确的反序列化器创建物化对象将解决问题。
protected <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> persistentStore(StoreName storeName, Serde<K> keyType, Serde<V> valueType) {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(storeName.toString());
return Materialized.<K, V>as(storeSupplier).withKeySerde(keyType).withValueSerde(valueType);
}
这里可以使用任何商店供应商 - 这正是满足我需要的供应商。