Kafka Streams - 在丰富的流中添加消息频率

Kafka Streams - adding message frequency in enriched stream

我想从流 (k,v) 计算流 (k, (v,f)),其中 f 是给定键在最后 n 秒内出现的频率。 给个题目(t1)如果我用windowedtable来计算频率:

KTable<Windowed<Integer>,Long> t1_velocity_table = t1_stream.groupByKey().windowedBy(TimeWindows.of(n*1000)).count();

这将给出一个窗口 table 以及每个键的频率。

假设我无法使用 Windowed 密钥加入,而不是上面的 table,我将流映射到 table,使用简单的密钥:

t1_Stream.groupByKey()
                .windowedBy(TimeWindows.of( n*1000)).count()
                .toStream().map((k,v)->new KeyValue<>(k.key(), Math.toIntExact(v))).to(frequency_topic);
KTable<Integer,Integer> t1_frequency_table = builder.table(frequency_topic);

如果我现在在这个 table 中查找,当一个新的密钥到达我的流时,我如何知道这个查找 table 将首先更新还是首先发生连接(这将导致将过时的频率添加到记录中,而不是当前更新的频率)。创建流而不是 table 然后进行窗口连接会更好吗? 我想用这样的东西查找 table:

KStream<Integer,Tuple<Integer,Integer>> t1_enriched = t1_Stream.join(t1_frequency_table, (l,r) -> new Tuple<>(l, r));

因此,我有一个 (k,(v,f)) 流,而不是只有一个 (k,v) 流,其中 f 是最后 n 秒内密钥 k 的频率。

关于实现此目标的正确方法有什么想法吗?谢谢。

对于您分享的特定节目,将首先处理流端记录。原因是,您通过主题管道传输数据...

当记录被处理时,它会更新聚合结果,该结果将发出一条更新记录,该更新记录被写入直通主题。紧接着,记录将由连接运算符处理。只有在之后,新的 poll() 调用才会最终从直通主题中读取聚合结果并更新连接的 table 端。

使用DSL,似乎无法达到你想要的效果。但是,您可以编写一个自定义 Transformer 来重新实现流-table 连接,提供您需要的语义。