Kafka Streams:及时获取事件计数 window
Kafka Streams : Get count of events in time window
我将数据流作为 事件。我想获得 10 分钟时间 windows 的事件计数并输出到另一个主题。以下是我的代码
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("events")
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
.count()
.toStream()
.to("output");
但我收到错误
ClassCastException while producing data to topic output. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
windowedBy
加上 count()
的结果是类型为 <Windowed<String>, Long>
的键值对,因此您需要通过 [= 在 to()
中设置不同的 serde 14=] 参数。默认情况下,将使用配置中的 serdes,您似乎将其设置为 StringSerde/StringSerde
,而这些显然与输出主题 key/value 类型不匹配。
Kafka Streams 附带了用于窗口类型的内置 serdes,您可以通过 Serdes
工厂 class.
获得
我将数据流作为
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("events")
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
.count()
.toStream()
.to("output");
但我收到错误
ClassCastException while producing data to topic output. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
windowedBy
加上 count()
的结果是类型为 <Windowed<String>, Long>
的键值对,因此您需要通过 [= 在 to()
中设置不同的 serde 14=] 参数。默认情况下,将使用配置中的 serdes,您似乎将其设置为 StringSerde/StringSerde
,而这些显然与输出主题 key/value 类型不匹配。
Kafka Streams 附带了用于窗口类型的内置 serdes,您可以通过 Serdes
工厂 class.