如何在 kafka 主题中只存储最新的键值

How to store only latest key values in a kafka topic

我有一个主题,有大量数据流向它。我需要的是从该主题创建一个单独的主题,该主题仅具有给定键的最新值集。

我认为 KTable 的全部目的是它将存储给定键的最新值,而不是存储整个事件流。但是我似乎无法让它工作。 运行 下面的代码生成密钥库,但该密钥库 (maintopiclatest) 中有一个事件流(不仅仅是最新值)。因此,如果我两次发送主题中包含 1000 条记录的请求,我看到的不是 1000 条记录,而是 2000 条记录。

var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();

var stream = kStreamBuilder.stream("maintopic",
    Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));

var table = stream
    .groupByKey()
    .reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));

另一个问题是,如果我想将 KTable 存储在一个新主题中,我不知道该怎么做。为了做到这一点,我似乎必须将它转回 Stream 以便我可以在其上调用“.to”。但是那里面有整个事件流,而不仅仅是最新的值。

这不是 KTable 的工作方式。

KTable 本身有一个内部状态存储,每个键只存储一条记录。但是,一个KTable是不断更新的,服从所谓的stream-table-duality。 KTable 的每个更新都作为更改日志记录发送到下游:https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables。因此,每个输入记录都会产生一个输出记录。

因为是流处理所以没有"last key per value".

I have a topic that has a stream of data coming to it. What I need is to create a separate topic from this topic that only has the latest set of values given the keys.

您希望 KTable 在哪个时间点发出更新?这个问题没有答案,因为输入流在概念上是无限的。