Kafka Streams - 使用地址聚合和加入用户
Kafka Streams - aggregating and joining user with addresses
我有两个紧凑的主题。一个包含有关我的用户的所有信息 (USERID),另一个保存他们的地址 (USERID,ADRESSID).括号中是键。我想要的是仅将一个主题用户数据及其地址列表保存在一个主题中。我的做法是:
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
最后,我对用户执行 leftJoin 并通过 USERID aggregated_addresses 并将结果保存到名为 "user_addresses". 的压缩主题中
我想实现将所有数据及其地址保存在 user_addresses 中。这意味着我不想在一段时间后丢失任何地址。只有在数据库中删除了地址时。我的问题是我的方法是否适合实现这一目标。我的原型正在运行,它正在为每个用户保存一个地址列表,但我问自己 KGroupedStream 是否会在一段时间后删除一些流。
也许有人可以详细解释一下这个管道是如何工作的。如果一个新流(地址)进入,它会通过整个管道(selectKey、groupByKey、聚合)并最终出现在主题 aggregated_addresses 中,它被保存为地址列表?步骤聚合正在使用此语句:
(user, address, queue) -> {...}
Kafka streams是否使用aggregated_addresses填充上述语句的队列?我是,如果一个新流到达 .aggregate,Kafka 会在 aggregated_addresses 中搜索它们相应的聚合列表并用这些数据填充队列吗?还是使用 .groupByKey 的分组流,每次有新流进来时,发送整个分组流进行聚合?如果第二个为真,KGroupedStream 会在一周后删除一些流吗?如果是,队列中会缺少一些地址吗?
KGroupedStream 和 KGroupedTable 在内部有什么区别?
有趣的是,连接后的结果(在名为 user_addresses 的压缩主题中)的条目数比 table 用户的条目数多。我看得更深,发现具有相同密钥的用户多次出现(多个偏移量)。在最小的偏移量处,该用户没有地址,然后在较高的偏移量处,它在他的列表中有一个地址,而在最高偏移量时,它在他的列表中有两个地址。我再次问自己,当我使用压缩主题时,为什么旧偏移量没有自动删除。 Kafka 的压缩是否像垃圾收集器一样工作,然后删除数据?如果我正在搜索一个密钥,我会得到偏移量最大的密钥吗?
很抱歉问了这么多问题,但随着我越来越多地使用流,有些事情我不清楚。
在此先感谢您的帮助! :)
I am asking myself if KGroupedStream will remove some streams after a time or not.
它不会删除任何东西。
如果我理解你问题的其余部分,你是在问 aggregate()
运算符是如何工作的。它使用本地状态存储(使用 RocksDB 实现)来存储 <userId, X>
,其中 X
是您的聚合 UDF ((user, address, queue) -> { }
) returns,即它应该是 X == queue
).因此,每个输入记录都会在 RocksDB 中进行本地查找以获取当前的 queue
,更新它,将其写回 RocksDB 并将其发送到下游的 to()
运算符中,运算符也将其写入结果主题。
另请阅读文档以获取更多详细信息:https://kafka.apache.org/21/documentation/streams/还有很多其他 material 关于 Kafka Streams 及其在 Internet 上的工作方式(博客文章、谈话录音、幻灯片... )
It's interesting, that the result after the join (in a compacted topic called user_addresses) has more entries than the entries table user has. I looked deeper and saw, that user with the same key has multiple occurrences (multiple offsets). At the smallest offset this user has no addresses, then at a higher offset, it has one address in his list and the highest offset it has two addresses in his list. I am again asking myself, why are old offsets not automatically removed, when I am using a compacted topic. Is Kafka's compaction working like a garbage collector which is removing data in afterward? What if I am searching for a key, will I get the key with the highest offset?
压缩在后台异步完成,但不是立即完成。另请注意,主题(或更准确地说)分区被分成 "segment" 并且活动段永远不会被压缩(默认段大小为 1GB)。您可以配置段大小以及触发压缩的频率(阅读文档以获取更多详细信息:https://kafka.apache.org/documentation/#compaction)。
What if I am searching for a key, will I get the key with the highest offset?
不确定你的意思。 Kafka 只允许顺序读取,但不允许键查找。因此,您需要从头到尾阅读主题以找到密钥的最新版本。如果您参考 Kafka Streams "Interactive Queries" 功能,它将查询本地 RocksDB,因此包含每个键的最新条目。
My question is if my approach is a good one to achieve this.
是的,有一个重要的细节,与
有关
What is internally the difference between KGroupedStream and KGroupedTable?
因为您输入的主题是一个使用键 (userId,addressId)
的压缩主题,您应该将其读作 table()
(而不是 stream()
):
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
builder.table("address-topic")
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupBy(...) //Select USERID as and group by USERID
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
不同之处在于,如果您阅读一个主题 KStreams
,则解释为 "facts",因此没有删除语义。但是,您输入的主题包含 "updates" 条记录,因此它应该是消费者本身。 KGroupedStream
和 KGroupedTable
只是 API 中的中间对象,也暗示了 "fact" 与 "update" 语义。同样,在 Internet 上查看文档和更多内容 material 以了解更多详细信息。
我有两个紧凑的主题。一个包含有关我的用户的所有信息 (USERID),另一个保存他们的地址 (USERID,ADRESSID).括号中是键。我想要的是仅将一个主题用户数据及其地址列表保存在一个主题中。我的做法是:
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
我想实现将所有数据及其地址保存在 user_addresses 中。这意味着我不想在一段时间后丢失任何地址。只有在数据库中删除了地址时。我的问题是我的方法是否适合实现这一目标。我的原型正在运行,它正在为每个用户保存一个地址列表,但我问自己 KGroupedStream 是否会在一段时间后删除一些流。
也许有人可以详细解释一下这个管道是如何工作的。如果一个新流(地址)进入,它会通过整个管道(selectKey、groupByKey、聚合)并最终出现在主题 aggregated_addresses 中,它被保存为地址列表?步骤聚合正在使用此语句:
(user, address, queue) -> {...}
Kafka streams是否使用aggregated_addresses填充上述语句的队列?我是,如果一个新流到达 .aggregate,Kafka 会在 aggregated_addresses 中搜索它们相应的聚合列表并用这些数据填充队列吗?还是使用 .groupByKey 的分组流,每次有新流进来时,发送整个分组流进行聚合?如果第二个为真,KGroupedStream 会在一周后删除一些流吗?如果是,队列中会缺少一些地址吗?
KGroupedStream 和 KGroupedTable 在内部有什么区别?
有趣的是,连接后的结果(在名为 user_addresses 的压缩主题中)的条目数比 table 用户的条目数多。我看得更深,发现具有相同密钥的用户多次出现(多个偏移量)。在最小的偏移量处,该用户没有地址,然后在较高的偏移量处,它在他的列表中有一个地址,而在最高偏移量时,它在他的列表中有两个地址。我再次问自己,当我使用压缩主题时,为什么旧偏移量没有自动删除。 Kafka 的压缩是否像垃圾收集器一样工作,然后删除数据?如果我正在搜索一个密钥,我会得到偏移量最大的密钥吗?
很抱歉问了这么多问题,但随着我越来越多地使用流,有些事情我不清楚。
在此先感谢您的帮助! :)
I am asking myself if KGroupedStream will remove some streams after a time or not.
它不会删除任何东西。
如果我理解你问题的其余部分,你是在问 aggregate()
运算符是如何工作的。它使用本地状态存储(使用 RocksDB 实现)来存储 <userId, X>
,其中 X
是您的聚合 UDF ((user, address, queue) -> { }
) returns,即它应该是 X == queue
).因此,每个输入记录都会在 RocksDB 中进行本地查找以获取当前的 queue
,更新它,将其写回 RocksDB 并将其发送到下游的 to()
运算符中,运算符也将其写入结果主题。
另请阅读文档以获取更多详细信息:https://kafka.apache.org/21/documentation/streams/还有很多其他 material 关于 Kafka Streams 及其在 Internet 上的工作方式(博客文章、谈话录音、幻灯片... )
It's interesting, that the result after the join (in a compacted topic called user_addresses) has more entries than the entries table user has. I looked deeper and saw, that user with the same key has multiple occurrences (multiple offsets). At the smallest offset this user has no addresses, then at a higher offset, it has one address in his list and the highest offset it has two addresses in his list. I am again asking myself, why are old offsets not automatically removed, when I am using a compacted topic. Is Kafka's compaction working like a garbage collector which is removing data in afterward? What if I am searching for a key, will I get the key with the highest offset?
压缩在后台异步完成,但不是立即完成。另请注意,主题(或更准确地说)分区被分成 "segment" 并且活动段永远不会被压缩(默认段大小为 1GB)。您可以配置段大小以及触发压缩的频率(阅读文档以获取更多详细信息:https://kafka.apache.org/documentation/#compaction)。
What if I am searching for a key, will I get the key with the highest offset?
不确定你的意思。 Kafka 只允许顺序读取,但不允许键查找。因此,您需要从头到尾阅读主题以找到密钥的最新版本。如果您参考 Kafka Streams "Interactive Queries" 功能,它将查询本地 RocksDB,因此包含每个键的最新条目。
My question is if my approach is a good one to achieve this.
是的,有一个重要的细节,与
有关What is internally the difference between KGroupedStream and KGroupedTable?
因为您输入的主题是一个使用键 (userId,addressId)
的压缩主题,您应该将其读作 table()
(而不是 stream()
):
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
builder.table("address-topic")
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupBy(...) //Select USERID as and group by USERID
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
不同之处在于,如果您阅读一个主题 KStreams
,则解释为 "facts",因此没有删除语义。但是,您输入的主题包含 "updates" 条记录,因此它应该是消费者本身。 KGroupedStream
和 KGroupedTable
只是 API 中的中间对象,也暗示了 "fact" 与 "update" 语义。同样,在 Internet 上查看文档和更多内容 material 以了解更多详细信息。