如何按小时聚合数据?
How to aggregate data hourly?
每当用户喜欢我们网站上的某些内容时,我们都会收集事件,我们计划做的是每小时提交一个内容的收藏夹并更新数据库中的收藏总数。
我们正在评估 Kafka Streams。遵循字数统计示例。我们的拓扑结构很简单,生成一个主题 A,读取聚合数据并将其提交到另一个主题 B。然后每小时使用一次来自主题 B 的事件并提交到数据库中。
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
return new StreamsConfig(props);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
StreamsBuilder builder = streamBuilder();
KStream<String, String> source = builder.stream(topic);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
.to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
streams.start();
return source;
}
@Bean
public StreamsBuilder streamBuilder() {
return new StreamsBuilder();
}
但是,当我使用此主题 B 时,它从一开始就为我提供汇总数据。我的问题是,我们能否制定一些规定,让我可以使用前几个小时的分组数据,然后提交给 DB,然后 Kakfa 忘记前几个小时的数据,并每小时提供新数据而不是累计总和。设计拓扑是否正确或我们可以做得更好?
如果您想每小时获得一个聚合结果,可以使用 windowed 聚合,window 大小为 1 小时。
stream.groupBy(...)
.windowedBy(TimeWindow.of(1 *3600 * 1000))
.count(...)
查看文档了解更多详情:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing
密钥的输出类型是 Windowed<String>
(不是 String
)。您需要提供一个自定义的 Window<String>
Serde,或者转换密钥类型。咨询SessionWindowsExample.
每当用户喜欢我们网站上的某些内容时,我们都会收集事件,我们计划做的是每小时提交一个内容的收藏夹并更新数据库中的收藏总数。
我们正在评估 Kafka Streams。遵循字数统计示例。我们的拓扑结构很简单,生成一个主题 A,读取聚合数据并将其提交到另一个主题 B。然后每小时使用一次来自主题 B 的事件并提交到数据库中。
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
return new StreamsConfig(props);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
StreamsBuilder builder = streamBuilder();
KStream<String, String> source = builder.stream(topic);
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
.to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
streams.start();
return source;
}
@Bean
public StreamsBuilder streamBuilder() {
return new StreamsBuilder();
}
但是,当我使用此主题 B 时,它从一开始就为我提供汇总数据。我的问题是,我们能否制定一些规定,让我可以使用前几个小时的分组数据,然后提交给 DB,然后 Kakfa 忘记前几个小时的数据,并每小时提供新数据而不是累计总和。设计拓扑是否正确或我们可以做得更好?
如果您想每小时获得一个聚合结果,可以使用 windowed 聚合,window 大小为 1 小时。
stream.groupBy(...)
.windowedBy(TimeWindow.of(1 *3600 * 1000))
.count(...)
查看文档了解更多详情:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing
密钥的输出类型是 Windowed<String>
(不是 String
)。您需要提供一个自定义的 Window<String>
Serde,或者转换密钥类型。咨询SessionWindowsExample.