Kafka Streams:SerializationException:LongDeserializer 接收到的数据大小不是 8
Kafka Streams: SerializationException: Size of data received by LongDeserializer is not 8
我有一个使用 Apache Kafka 计算颜色数量的小应用程序 -
public class FavouriteColor {
private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";
private static final String APPLICATION_ID = "favourite-colour-java";
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);
KStream<String, String> usersAndColours = textLines
.filter((key, value) -> value.contains(","))
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
.mapValues(value -> value.split(",")[1].toLowerCase())
.filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));
usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);
KTable<String, Long> favouriteColours = usersAndColoursTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Named.as("CountsByColours"));
favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
System.out.println(streams);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
创建主题并使用终端启动生产者/消费者:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic favourite-colour-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input
我向终端提供了以下输入:
stephane,blue
john,green
stephane,red
alice,red
我在消费端收到错误:
stephane Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo(ConsoleConsumer.scala:519)
at scala.Option.map(Option.scala:242)
at kafka.tools.DefaultMessageFormatter.deserialize(ConsoleConsumer.scala:519)
at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
这里有什么问题?我做了简短的研究,发现其他人提出了类似的问题,但是,这些解决方案似乎对我不起作用。
您将值反序列化程序定义为 Long 的反序列化程序,但看起来您的数据是一个字符串。
我有一个使用 Apache Kafka 计算颜色数量的小应用程序 -
public class FavouriteColor {
private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";
private static final String APPLICATION_ID = "favourite-colour-java";
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);
KStream<String, String> usersAndColours = textLines
.filter((key, value) -> value.contains(","))
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
.mapValues(value -> value.split(",")[1].toLowerCase())
.filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));
usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);
KTable<String, Long> favouriteColours = usersAndColoursTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Named.as("CountsByColours"));
favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
System.out.println(streams);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
创建主题并使用终端启动生产者/消费者:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic favourite-colour-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input
我向终端提供了以下输入:
stephane,blue
john,green
stephane,red
alice,red
我在消费端收到错误:
stephane Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo(ConsoleConsumer.scala:519)
at scala.Option.map(Option.scala:242)
at kafka.tools.DefaultMessageFormatter.deserialize(ConsoleConsumer.scala:519)
at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
这里有什么问题?我做了简短的研究,发现其他人提出了类似的问题,但是,这些解决方案似乎对我不起作用。
您将值反序列化程序定义为 Long 的反序列化程序,但看起来您的数据是一个字符串。