Kafka Streams - 使用地址聚合和加入用户

Kafka Streams - aggregating and joining user with addresses

我有两个紧凑的主题。一个包含有关我的用户的所有信息 (USERID),另一个保存他们的地址 (USERID,ADRESSID).括号中是键。我想要的是仅将一个主题用户数据及其地址列表保存在一个主题中。我的做法是:

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key 
最后,我对用户执行 leftJoin 并通过 USERID aggregated_addresses 并将结果保存到名为 "user_addresses".

的压缩主题中

我想实现将所有数据及其地址保存在 user_addresses 中。这意味着我不想在一段时间后丢失任何地址。只有在数据库中删除了地址时。我的问题是我的方法是否适合实现这一目标。我的原型正在运行,它正在为每个用户保存一个地址列表,但我问自己 KGroupedStream 是否会在一段时间后删除一些流。

也许有人可以详细解释一下这个管道是如何工作的。如果一个新流(地址)进入,它会通过整个管道(selectKey、groupByKey、聚合)并最终出现在主题 aggregated_addresses 中,它被保存为地址列表?步骤聚合正在使用此语句:

(user, address, queue) -> {...}

Kafka streams是否使用aggregated_addresses填充上述语句的队列?我是,如果一个新流到达 .aggregate,Kafka 会在 aggregated_addresses 中搜索它们相应的聚合列表并用这些数据填充队列吗?还是使用 .groupByKey 的分组流,每次有新流进来时,发送整个分组流进行聚合?如果第二个为真,KGroupedStream 会在一周后删除一些流吗?如果是,队列中会缺少一些地址吗?

KGroupedStream 和 KGroupedTable 在内部有什么区别?

有趣的是,连接后的结果(在名为 user_addresses 的压缩主题中)的条目数比 table 用户的条目数多。我看得更深,发现具有相同密钥的用户多次出现(多个偏移量)。在最小的偏移量处,该用户没有地址,然后在较高的偏移量处,它在他的列表中有一个地址,而在最高偏移量时,它在他的列表中有两个地址。我再次问自己,当我使用压缩主题时,为什么旧​​偏移量没有自动删除。 Kafka 的压缩是否像垃圾收集器一样工作,然后删除数据?如果我正在搜索一个密钥,我会得到偏移量最大的密钥吗?

很抱歉问了这么多问题,但随着我越来越多地使用流,有些事情我不清楚。

在此先感谢您的帮助! :)

I am asking myself if KGroupedStream will remove some streams after a time or not.

它不会删除任何东西。

如果我理解你问题的其余部分,你是在问 aggregate() 运算符是如何工作的。它使用本地状态存储(使用 RocksDB 实现)来存储 <userId, X>,其中 X 是您的聚合 UDF ((user, address, queue) -> { }) returns,即它应该是 X == queue).因此,每个输入记录都会在 RocksDB 中进行本地查找以获取当前的 queue,更新它,将其写回 RocksDB 并将其发送到下游的 to() 运算符中,运算符也将其写入结果主题。

另请阅读文档以获取更多详细信息:https://kafka.apache.org/21/documentation/streams/还有很多其他 material 关于 Kafka Streams 及其在 Internet 上的工作方式(博客文章、谈话录音、幻灯片... )

It's interesting, that the result after the join (in a compacted topic called user_addresses) has more entries than the entries table user has. I looked deeper and saw, that user with the same key has multiple occurrences (multiple offsets). At the smallest offset this user has no addresses, then at a higher offset, it has one address in his list and the highest offset it has two addresses in his list. I am again asking myself, why are old offsets not automatically removed, when I am using a compacted topic. Is Kafka's compaction working like a garbage collector which is removing data in afterward? What if I am searching for a key, will I get the key with the highest offset?

压缩在后台异步完成,但不是立即完成。另请注意,主题(或更准确地说)分区被分成 "segment" 并且活动段永远不会被压缩(默认段大小为 1GB)。您可以配置段大小以及触发压缩的频率(阅读文档以获取更多详细信息:https://kafka.apache.org/documentation/#compaction)。

What if I am searching for a key, will I get the key with the highest offset?

不确定你的意思。 Kafka 只允许顺序读取,但不允许键查找。因此,您需要从头到尾阅读主题以找到密钥的最新版本。如果您参考 Kafka Streams "Interactive Queries" 功能,它将查询本地 RocksDB,因此包含每个键的最新条目。

My question is if my approach is a good one to achieve this.

是的,有一个重要的细节,与

有关

What is internally the difference between KGroupedStream and KGroupedTable?

因为您输入的主题是一个使用键 (userId,addressId) 的压缩主题,您应该将其读作 table()(而不是 stream()):

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
    builder.table("address-topic")
      .selectKey(...) //Selecting USERID as key - this generates KStream
      .groupBy(...) //Select USERID as and group by USERID
      .aggregate(...) //Aggregating by USERID as key - this generates KTable
      .to("aggregated_addresses"); //KTable with USERID as key 

不同之处在于,如果您阅读一个主题 KStreams,则解释为 "facts",因此没有删除语义。但是,您输入的主题包含 "updates" 条记录,因此它应该是消费者本身。 KGroupedStreamKGroupedTable 只是 API 中的中间对象,也暗示了 "fact" 与 "update" 语义。同样,在 Internet 上查看文档和更多内容 material 以了解更多详细信息。