Kafka Streams:SerializationException:LongDeserializer 接收到的数据大小不是 8

Kafka Streams: SerializationException: Size of data received by LongDeserializer is not 8

我有一个使用 Apache Kafka 计算颜色数量的小应用程序 -

public class FavouriteColor {


    private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
    private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
    private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";

    private static final String APPLICATION_ID = "favourite-colour-java";


    public static void main(String[] args) {

        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);

        KStream<String, String> usersAndColours = textLines
            .filter((key, value) -> value.contains(","))
            .selectKey((key, value) -> value.split(",")[0].toLowerCase())
            .mapValues(value -> value.split(",")[1].toLowerCase())
            .filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));

        usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
        KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);

        KTable<String, Long> favouriteColours = usersAndColoursTable
            .groupBy((user, colour) -> new KeyValue<>(colour, colour))
            .count(Named.as("CountsByColours"));

        favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.cleanUp();
        streams.start();

        System.out.println(streams);

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

创建主题并使用终端启动生产者/消费者:

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact



kafka-console-consumer --bootstrap-server localhost:9092 \
    --topic favourite-colour-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer



kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input

我向终端提供了以下输入:

stephane,blue
john,green
stephane,red
alice,red

我在消费端收到错误:

stephane    Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
    at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo(ConsoleConsumer.scala:519)
    at scala.Option.map(Option.scala:242)
    at kafka.tools.DefaultMessageFormatter.deserialize(ConsoleConsumer.scala:519)
    at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

这里有什么问题?我做了简短的研究,发现其他人提出了类似的问题,但是,这些解决方案似乎对我不起作用。

您将值反序列化程序定义为 Long 的反序列化程序,但看起来您的数据是一个字符串。