如何实现使用交互式查询存储和全局存储处理单个主题的 Kafka Streams 拓扑
How to implement Kafka Streams topology that process single topic with interactive queries store and global store
我正在尝试实现 Kafka Streams,它将单个主题流视为具有可能的交互式查询的全局数据库。所以我想要:
全局记录存储(GlobalKTable、KeyValueStore)
可查询存储,允许我获得交互式查询的结果(最大)
交互式查询必须计算记录字段之一的全局最大值:
KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
.selectKey((key, value) -> "lastdate")
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
Materialized.as("terc-lastupdate"));
但是,我遇到了无法在一个 Kafka Streams 实例中使用同一个主题作为源的问题。我进行了研究,我发现唯一的方法是通过多个 KafkaStreams 实例,但我不确定这是实现此目的的正确且唯一的方法。有什么想法吗?
我为每个任务使用了多个 KafkaStreams 实例并且它工作正常。
我正在尝试实现 Kafka Streams,它将单个主题流视为具有可能的交互式查询的全局数据库。所以我想要:
全局记录存储(GlobalKTable、KeyValueStore)
可查询存储,允许我获得交互式查询的结果(最大)
交互式查询必须计算记录字段之一的全局最大值:
KStream<String, TercUnitRecord> recordsStream = topologyBuilder.stream(topicName);
KTable<String, Long> lastUpdateStore = recordsStream.mapValues(record -> record.getLastUpdate())
.selectKey((key, value) -> "lastdate")
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce((maxValue, currValue) -> maxValue.compareTo(currValue) == 1 ? maxValue : currValue,
Materialized.as("terc-lastupdate"));
但是,我遇到了无法在一个 Kafka Streams 实例中使用同一个主题作为源的问题。我进行了研究,我发现唯一的方法是通过多个 KafkaStreams 实例,但我不确定这是实现此目的的正确且唯一的方法。有什么想法吗?
我为每个任务使用了多个 KafkaStreams 实例并且它工作正常。