Kafka Streams 中的消息密钥为 Long
Message key as Long in Kafka Streams
我正在尝试使用 Long 作为消息密钥的类型,但我得到了
Exception in thread "kafka_stream_app-f236aaca-3f90-469d-9d32-20ff694806ff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key for record. topic=test, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
我查了一下 data.length
是 7
。
我在 streamsConfiguration 中设置了
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
我用
KStream<Long, GenericRecord> stream = builder.stream(topic);
我试过通过一个简单的应用程序和 kafka-avro-console-producer
:
发送消息
/opt/confluent-3.3.0/bin/kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic test \
--property key.separator=, \
--property parse.key=true \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"string"}' \
--property schema.registry.url=http://localhost:8081
有消息
123,"293"
使用 kafka-avro-console-consumer
我可以使用消息并查看(使用 --property print.key=true
发送的密钥是正确的 123
)
知道解码邮件时可能出现什么问题吗?
因为您使用的是 kafka-avro-console-producer
,密钥未序列化为普通 Long
,而是序列化为 Avro 类型。因此,您需要使用相应的 Avro Serde,其架构与您在写入路径上使用的架构相同(即 '{"type":"long"}"
)。
此外,您的 return 类型不会是 Long
而是 Avro 类型。
我正在尝试使用 Long 作为消息密钥的类型,但我得到了
Exception in thread "kafka_stream_app-f236aaca-3f90-469d-9d32-20ff694806ff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key for record. topic=test, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
我查了一下 data.length
是 7
。
我在 streamsConfiguration 中设置了
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
我用
KStream<Long, GenericRecord> stream = builder.stream(topic);
我试过通过一个简单的应用程序和 kafka-avro-console-producer
:
/opt/confluent-3.3.0/bin/kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic test \
--property key.separator=, \
--property parse.key=true \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"string"}' \
--property schema.registry.url=http://localhost:8081
有消息
123,"293"
使用 kafka-avro-console-consumer
我可以使用消息并查看(使用 --property print.key=true
发送的密钥是正确的 123
)
知道解码邮件时可能出现什么问题吗?
因为您使用的是 kafka-avro-console-producer
,密钥未序列化为普通 Long
,而是序列化为 Avro 类型。因此,您需要使用相应的 Avro Serde,其架构与您在写入路径上使用的架构相同(即 '{"type":"long"}"
)。
此外,您的 return 类型不会是 Long
而是 Avro 类型。