如何在 kafka 主题中只存储最新的键值
How to store only latest key values in a kafka topic
我有一个主题,有大量数据流向它。我需要的是从该主题创建一个单独的主题,该主题仅具有给定键的最新值集。
我认为 KTable 的全部目的是它将存储给定键的最新值,而不是存储整个事件流。但是我似乎无法让它工作。 运行 下面的代码生成密钥库,但该密钥库 (maintopiclatest) 中有一个事件流(不仅仅是最新值)。因此,如果我两次发送主题中包含 1000 条记录的请求,我看到的不是 1000 条记录,而是 2000 条记录。
var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();
var stream = kStreamBuilder.stream("maintopic",
Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));
var table = stream
.groupByKey()
.reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));
另一个问题是,如果我想将 KTable 存储在一个新主题中,我不知道该怎么做。为了做到这一点,我似乎必须将它转回 Stream 以便我可以在其上调用“.to”。但是那里面有整个事件流,而不仅仅是最新的值。
这不是 KTable 的工作方式。
KTable 本身有一个内部状态存储,每个键只存储一条记录。但是,一个KTable是不断更新的,服从所谓的stream-table-duality。 KTable 的每个更新都作为更改日志记录发送到下游:https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables。因此,每个输入记录都会产生一个输出记录。
因为是流处理所以没有"last key per value".
I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.
您希望 KTable 在哪个时间点发出更新?这个问题没有答案,因为输入流在概念上是无限的。
我有一个主题,有大量数据流向它。我需要的是从该主题创建一个单独的主题,该主题仅具有给定键的最新值集。
我认为 KTable 的全部目的是它将存储给定键的最新值,而不是存储整个事件流。但是我似乎无法让它工作。 运行 下面的代码生成密钥库,但该密钥库 (maintopiclatest) 中有一个事件流(不仅仅是最新值)。因此,如果我两次发送主题中包含 1000 条记录的请求,我看到的不是 1000 条记录,而是 2000 条记录。
var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();
var stream = kStreamBuilder.stream("maintopic",
Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));
var table = stream
.groupByKey()
.reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));
另一个问题是,如果我想将 KTable 存储在一个新主题中,我不知道该怎么做。为了做到这一点,我似乎必须将它转回 Stream 以便我可以在其上调用“.to”。但是那里面有整个事件流,而不仅仅是最新的值。
这不是 KTable 的工作方式。
KTable 本身有一个内部状态存储,每个键只存储一条记录。但是,一个KTable是不断更新的,服从所谓的stream-table-duality。 KTable 的每个更新都作为更改日志记录发送到下游:https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables。因此,每个输入记录都会产生一个输出记录。
因为是流处理所以没有"last key per value".
I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.
您希望 KTable 在哪个时间点发出更新?这个问题没有答案,因为输入流在概念上是无限的。