在 Kafka Streams 中聚合多个分区
Aggregate over multiple partitions in Kafka Streams
这部分是
的后续行动
假设我有一个名为 "events" 的主题,其中包含 3 个分区,我在其中发送字符串 -> 整数数据,如下所示:
(Bob, 3) 在分区 1
(Sally, 4) 在分区 2
(Bob, 2) 在分区 3
...
我想聚合所有分区的值(在这个例子中,只是一个简单的总和),最终得到一个 KTable
,看起来像:
(莎莉,4 岁)
(鲍勃,5 岁)
正如我在上面链接的问题的答案中提到的,直接进行这种跨分区聚合是不可能的。但是,回答者提到如果消息具有相同的密钥是可能的(在这种情况下是正确的)。如何实现?
我还希望能够从跨 Kafka Streams 应用程序的每个实例复制的 "global" 状态存储中查询这些聚合值。
我的第一个想法是使用GlobalKTable
(我相信,根据this page,应该是我需要的)。但是,此状态存储的变更日志主题与原始 "events" 主题具有相同数量的分区,并且只是在每个分区的基础上而不是跨所有分区进行聚合。
这是我的应用程序的精简版 - 不太确定从这里去哪里:
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey()
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();
builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
Kafka Streams 假定输入主题是按键分区的。这个假设不适用于您的情况。因此,您需要将此告知 Kafka Streams。
在您的特定情况下,您可以将 groupByKey
替换为 groupBy()
KTable<String, Double> aggregatedMetrics = eventStream
.groupBy((k,v) -> k)
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
lambda 是一个不修改键的虚拟对象,但是,它提示 Kafka Streams 在进行聚合之前根据键重新分区数据。
关于GlobalKTable
:这是一种特殊的table,它不是聚合的结果,而是仅从更新日志主题中填充。看来您的代码已经在做正确的事情:将聚合结果写入主题并重新读取主题为 GlobalKTable
.
这部分是
假设我有一个名为 "events" 的主题,其中包含 3 个分区,我在其中发送字符串 -> 整数数据,如下所示:
(Bob, 3) 在分区 1
(Sally, 4) 在分区 2
(Bob, 2) 在分区 3
...
我想聚合所有分区的值(在这个例子中,只是一个简单的总和),最终得到一个 KTable
,看起来像:
(莎莉,4 岁)
(鲍勃,5 岁)
正如我在上面链接的问题的答案中提到的,直接进行这种跨分区聚合是不可能的。但是,回答者提到如果消息具有相同的密钥是可能的(在这种情况下是正确的)。如何实现?
我还希望能够从跨 Kafka Streams 应用程序的每个实例复制的 "global" 状态存储中查询这些聚合值。
我的第一个想法是使用GlobalKTable
(我相信,根据this page,应该是我需要的)。但是,此状态存储的变更日志主题与原始 "events" 主题具有相同数量的分区,并且只是在每个分区的基础上而不是跨所有分区进行聚合。
这是我的应用程序的精简版 - 不太确定从这里去哪里:
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey()
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();
builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
Kafka Streams 假定输入主题是按键分区的。这个假设不适用于您的情况。因此,您需要将此告知 Kafka Streams。
在您的特定情况下,您可以将 groupByKey
替换为 groupBy()
KTable<String, Double> aggregatedMetrics = eventStream
.groupBy((k,v) -> k)
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
lambda 是一个不修改键的虚拟对象,但是,它提示 Kafka Streams 在进行聚合之前根据键重新分区数据。
关于GlobalKTable
:这是一种特殊的table,它不是聚合的结果,而是仅从更新日志主题中填充。看来您的代码已经在做正确的事情:将聚合结果写入主题并重新读取主题为 GlobalKTable
.