我正在尝试一个窗口化的字数统计应用程序流,在消费者控制台中我有一些不可读的字符以及计数
i'm trying a windowed word count application streams, in consumer console i have some unreadable characters alongwith count
申请(.java)文件如下;
public class WordCountFinal {
public static void main(String[] args) {
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(stringSerializer);
TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "rogue");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ssc-vm-r.com:9092, ssc-vmr:9092, ssc-vm:9092");
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> wordcountinput = builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String()));
KGroupedStream<String, String> groupedStream = wordcountinput
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> aggregatedStream = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count();
aggregatedStream.toStream().to("tuesdaystopic", Produced.with(windowedSerde, Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
生产者控制台的输入是句子或单词。输出应该类似于类似的 wordcount 应用程序,但 2 分钟后,假设到现在我的 'qwerty' 字数为 5。两分钟后,我再次在生产者控制台中输入 qwerty,我应该得到输出计数为 1。
qwerty 3
qwerty 4
qwerty 5
abcd 1
2 分钟后,在产品中输入了 qwerty。控制台
qwerty 1
请注意,结果的键类型是 Windowed<String>
——这也是为什么在通过 to()
将结果流写入主题时使用 TimeWindowedSerializer
的原因(你不要使用 StringSerializer
).
当您使用控制台消费者读取数据时,虽然您为键指定了 StringDeserializer
,但是,键中的字节不是 String
类型,因此您得到了那些不可读的字符并且类型不匹配。
您可以指定不同的反序列化器(即,在使用控制台消费者时 TimeWindowedDeserializer
,或者在将结果写入输出主题之前修改密钥以键入 String
。例如,您可以使用:
aggregatedStream.toStream()
// `k` is of type Windowed<String>
// you can get the plain String key via `key()`
.selectKey((k,v) -> k.key())
.to(....)
申请(.java)文件如下;
public class WordCountFinal {
public static void main(String[] args) {
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(stringSerializer);
TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "rogue");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ssc-vm-r.com:9092, ssc-vmr:9092, ssc-vm:9092");
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> wordcountinput = builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String()));
KGroupedStream<String, String> groupedStream = wordcountinput
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> aggregatedStream = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count();
aggregatedStream.toStream().to("tuesdaystopic", Produced.with(windowedSerde, Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
生产者控制台的输入是句子或单词。输出应该类似于类似的 wordcount 应用程序,但 2 分钟后,假设到现在我的 'qwerty' 字数为 5。两分钟后,我再次在生产者控制台中输入 qwerty,我应该得到输出计数为 1。
qwerty 3
qwerty 4
qwerty 5
abcd 1
2 分钟后,在产品中输入了 qwerty。控制台
qwerty 1
请注意,结果的键类型是 Windowed<String>
——这也是为什么在通过 to()
将结果流写入主题时使用 TimeWindowedSerializer
的原因(你不要使用 StringSerializer
).
当您使用控制台消费者读取数据时,虽然您为键指定了 StringDeserializer
,但是,键中的字节不是 String
类型,因此您得到了那些不可读的字符并且类型不匹配。
您可以指定不同的反序列化器(即,在使用控制台消费者时 TimeWindowedDeserializer
,或者在将结果写入输出主题之前修改密钥以键入 String
。例如,您可以使用:
aggregatedStream.toStream()
// `k` is of type Windowed<String>
// you can get the plain String key via `key()`
.selectKey((k,v) -> k.key())
.to(....)