我正在尝试一个窗口化的字数统计应用程序流,在消费者控制台中我有一些不可读的字符以及计数

i'm trying a windowed word count application streams, in consumer console i have some unreadable characters alongwith count

申请(.java)文件如下;

public class WordCountFinal {

    public static void main(String[] args) {

        StringSerializer stringSerializer = new StringSerializer();
        StringDeserializer stringDeserializer = new StringDeserializer();
        TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(stringSerializer);
        TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(stringDeserializer);
        Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);


        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "rogue");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ssc-vm-r.com:9092, ssc-vmr:9092, ssc-vm:9092");
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> wordcountinput = builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String()));

        KGroupedStream<String, String> groupedStream = wordcountinput
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> aggregatedStream = groupedStream
                .windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
                .count();

        aggregatedStream.toStream().to("tuesdaystopic", Produced.with(windowedSerde, Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    }

}

生产者控制台的输入是句子或单词。输出应该类似于类似的 wordcount 应用程序,但 2 分钟后,假设到现在我的 'qwerty' 字数为 5。两分钟后,我再次在生产者控制台中输入 qwerty,我应该得到输出计数为 1。

qwerty 3

qwerty 4

qwerty 5

abcd 1

2 分钟后,在产品中输入了 qwerty。控制台

qwerty 1

请注意,结果的键类型是 Windowed<String>——这也是为什么在通过 to() 将结果流写入主题时使用 TimeWindowedSerializer 的原因(你不要使用 StringSerializer).

当您使用控制台消费者读取数据时,虽然您为键指定了 StringDeserializer,但是,键中的字节不是 String 类型,因此您得到了那些不可读的字符并且类型不匹配。

您可以指定不同的反序列化器(即,在使用控制台消费者时 TimeWindowedDeserializer,或者在将结果写入输出主题之前修改密钥以键入 String。例如,您可以使用:

aggregatedStream.toStream()
    // `k` is of type Windowed<String>
    // you can get the plain String key via `key()`
    .selectKey((k,v) -> k.key())
    .to(....)