使用查找数据丰富 KStream 的理想方式
Ideal way to enrich a KStream with lookup data
我的流有一个名为 'category' 的列,我在不同的商店中为每个 'category' 都有额外的静态元数据,它每两天更新一次。执行此查找的正确方法是什么? Kafka 流有两个选项
在 Kafka Streams 之外加载静态数据,只需使用 KStreams#map()
添加元数据。这是可能的,因为 Kafka Streams 只是一个库。
将元数据加载到 Kafka 主题,将其加载到 KTable
并执行 KStreams#leftJoin()
,这看起来更自然并将分区等留给 Kafka Streams。但是,这需要我们保持 KTable
加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。
- 例如,假设最初只有一个类别 'c1'。 Kafka 流应用程序已正常停止,并再次重新启动。重启后,新增了一个分类'c2'。我的假设是,table = KStreamBuilder().table('metadataTopic') 将只具有值 'c2',因为这是自应用程序启动以来唯一发生变化的事情第二次。我希望它有 'c1' 和 'c2'.
- 如果它也有 'c1',数据是否会从 KTable 中删除(可能通过设置发送 key = null 消息?)?
以上哪项是查找元数据的正确方法?
是否可以在重新启动时始终强制只从头读取一个流,这样所有元数据都可以加载到 KTable
。
还有其他使用商店的方法吗?
您的整体观察是正确的,这取决于哪些权衡对您来说更重要。如果您的元数据很小,选项 1 似乎更好。如果元数据很大,似乎选项 2 是可行的方法。
如果您使用 map()
,您需要在每个应用程序实例中拥有元数据的完整副本(因为您无法确切知道 Streams 将如何对您的 KStream
数据进行分区)。因此,如果您的元数据不适合主内存使用 map()
将不会很容易工作。
如果您使用 KTable
,Streams 会确保元数据在所有 运行 应用程序实例上正确分片,这样就不需要重复数据。此外,KTable
使用 RocksDB 作为状态存储引擎,因此可以溢出到磁盘。
编辑开始
关于在 KTable
中包含所有数据:如果同一个键有两个类别,如果您直接从主题中将数据读入 KTable
,则第二个值将覆盖第一个值通过 builder.table(...)
(变更日志语义)。但是,您可以通过将主题读取为记录流(即 builder.stream(...)
并应用聚合来计算 KTable
来轻松解决此问题。您的聚合将简单地为每个键发出所有值的列表.
关于删除:KTable
使用变更日志语义并理解删除键值对的逻辑删除消息。因此,如果您从主题中读取 KTable
并且该主题包含 <key:null>
消息,则 KTable
中具有此键的当前记录将被删除。当 KTable
是聚合的结果时,这更难实现,因为具有 null
键或 null
值的聚合输入记录将被忽略并且不会更新聚合结果。
解决方法是在聚合之前添加一个 map()
步骤并引入一个 NULL
值(即用户定义的 "object" 表示墓碑但不是 null
——在你的例子中,你可以称它为 null-category
)。在您的聚合中,如果输入记录具有 null-category
作为值,您只需 return 一个 null
值作为聚合结果。然后,这将转换为您的 KTable
的逻辑删除消息,并删除该键的当前类别列表。
编辑结束
当然,您始终可以通过处理器 API 构建自定义解决方案。但是,如果 DSL 可以满足您的需求,就没有理由这样做。
- Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.
这行得通。但通常人们会选择您列出的下一个选项,因为用于丰富输入流的辅助数据通常不是完全静态的;相反,它正在发生变化,但并不频繁:
- Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
这是通常的方法,我建议您坚持使用,除非您有特殊原因不这样做。
However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
所以我猜你也更喜欢第二种选择,但你担心这样是否有效。
简短的回答是:是的,KTable 将加载每个键的所有(最新)值。 table 将包含整个查找数据,但请记住 KTable 在幕后进行分区:例如,如果您的输入主题(对于 table)有 3
个分区,那么您最多可以 运行 您的应用程序的 3
个实例,每个实例都获得 table 的 1
分区(假设数据在分区之间均匀分布,那么每个 [ table 的 =105=] 将包含 table 的大约 1/3 的数据)。所以在实践中更有可能 "just works"。我在下面分享更多细节。
全局 KTables: 或者,您可以使用 global KTables 而不是(分区的)正常 table 变体。使用全局 tables,应用程序的每个实例都有 table 数据的完整副本。这使得全局 tables 对于加入场景非常有用,包括根据您的问题丰富 KStream。
Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.
这个你不用担心。简单地说,如果 table 的本地 "copy" 不可用,那么 Streams API 将自动确保从头开始完整读取 table 的数据。如果有可用的本地副本,那么您的应用程序将重新使用该副本(并在 table 的输入主题中有新数据可用时更新其本地副本)。
带有示例的更长答案
想象一下 KTable
的以下输入数据(想想:更新日志流),注意这个输入是如何由 6
消息组成的:
(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
这是由此输入产生的 "logical" KTable
的各种状态,其中每个新收到的输入消息(例如 (alice, 1)
)都会产生一个新的table 的状态:
Key Value
--------------
alice | 1 // (alice, 1) received
|
V
Key Value
--------------
alice | 1
bob | 40 // (bob, 40) received
|
V
Key Value
--------------
alice | 2 // (alice, 2) received
bob | 40
|
V
Key Value
--------------
alice | 2
bob | 40
charlie | 600 // (charlie, 600) received
|
V
Key Value
--------------
alice | 5 // (alice, 5) received
bob | 40
charlie | 600
|
V
Key Value
--------------
alice | 5
bob | 22 // (bob, 22) received
charlie | 600
你在这里可以看到,即使输入数据可能有很多很多消息(或如你所说的 "changes";这里,我们有 6
),[结果 KTable
中的 =106=](正在根据新收到的输入进行连续突变)是输入中唯一键的数量(此处:从 1
开始,逐渐增加到 3
),这通常明显少于消息的数量。因此,如果输入中的消息数为 N
,并且这些消息的唯一键数为 M
,则通常 M << N
(M
明显小于 N
; 另外,为了记录,我们有不变量 M <= N
).
这是 "this requires us to keep the KTable loaded with all the values" 通常不是问题的第一个原因,因为每个键只保留最新值。
第二个有用的原因是,正如 Matthias J. Sax 指出的那样,Kafka Streams 使用 RocksDB 作为此类 table 的默认存储引擎(更准确地说:状态存储支持 table). RocksDB 允许您维护 table 大于可用主内存 / Java 堆 space 的应用程序,因为它可以溢出到本地磁盘。
最后第三个原因是KTable
分区了。因此,如果 table 的输入主题(比如说)配置了 3
分区,那么幕后发生的事情是 KTable
本身在同样的方式。在上面的示例中,这是您最终可能得到的结果,尽管确切的 "splits" 取决于原始输入数据如何分布在 table 的输入主题的分区中:
逻辑 KTable(我上面显示的最后状态):
Key Value
--------------
alice | 5
bob | 22
charlie | 600
实际的 KTable,已分区(假设 table 的输入主题有 3
个分区,加上键=用户名均匀分布在各个分区中):
Key Value
--------------
alice | 5 // Assuming that all data for `alice` is in partition 1
Key Value
--------------
bob | 22 // ...for `bob` is in partition 2
Key Value
--------------
charlie | 600 // ...for `charlie` is in partition 3
实际上,输入数据的这种分区——除其他外——允许您"size" KTable 的实际表现。
另一个例子:
- 假设您的 KTable 的最新状态通常有 1 TB 的大小(同样,近似大小是 table 输入数据中唯一消息键的数量乘以关联消息值的平均大小)。
- 如果table的输入主题只有
1
个分区,那么KTable本身也只有1
个分区,大小为1TB。在这里,因为输入主题只有 1
个分区,所以您可以 运行 您的应用程序最多 1
个应用程序实例(所以并不是真的有很多并行性,呵呵)。
- 如果 table 的输入主题有
500
个分区,那么 KTable 也有 500
个分区,每个分区大小约为 2 GB(假设数据均匀分布分布在分区中)。在这里,您可以 运行 您的应用程序最多 500
个应用程序实例。如果您要 运行 恰好 500
个实例,那么每个应用程序实例将恰好获得逻辑 KTable 的 1
partition/shard,因此最终会得到 2 GB 的 table 数据;如果你要 运行 只有 100
个实例,那么每个实例都会得到 table 的 500 / 100 = 5
partitions/shards,最后大约 2 GB * 5 = 10 GB
table数据。
从 2017 年 2 月发布的 Kafka 0.10.2.0 开始,GlobalKTable
概念可能是使用查找数据丰富流的更好选择。
https://docs.confluent.io/current/streams/concepts.html#globalktable
我的流有一个名为 'category' 的列,我在不同的商店中为每个 'category' 都有额外的静态元数据,它每两天更新一次。执行此查找的正确方法是什么? Kafka 流有两个选项
在 Kafka Streams 之外加载静态数据,只需使用
KStreams#map()
添加元数据。这是可能的,因为 Kafka Streams 只是一个库。将元数据加载到 Kafka 主题,将其加载到
KTable
并执行KStreams#leftJoin()
,这看起来更自然并将分区等留给 Kafka Streams。但是,这需要我们保持KTable
加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。- 例如,假设最初只有一个类别 'c1'。 Kafka 流应用程序已正常停止,并再次重新启动。重启后,新增了一个分类'c2'。我的假设是,table = KStreamBuilder().table('metadataTopic') 将只具有值 'c2',因为这是自应用程序启动以来唯一发生变化的事情第二次。我希望它有 'c1' 和 'c2'.
- 如果它也有 'c1',数据是否会从 KTable 中删除(可能通过设置发送 key = null 消息?)?
以上哪项是查找元数据的正确方法?
是否可以在重新启动时始终强制只从头读取一个流,这样所有元数据都可以加载到 KTable
。
还有其他使用商店的方法吗?
您的整体观察是正确的,这取决于哪些权衡对您来说更重要。如果您的元数据很小,选项 1 似乎更好。如果元数据很大,似乎选项 2 是可行的方法。
如果您使用 map()
,您需要在每个应用程序实例中拥有元数据的完整副本(因为您无法确切知道 Streams 将如何对您的 KStream
数据进行分区)。因此,如果您的元数据不适合主内存使用 map()
将不会很容易工作。
如果您使用 KTable
,Streams 会确保元数据在所有 运行 应用程序实例上正确分片,这样就不需要重复数据。此外,KTable
使用 RocksDB 作为状态存储引擎,因此可以溢出到磁盘。
编辑开始
关于在 KTable
中包含所有数据:如果同一个键有两个类别,如果您直接从主题中将数据读入 KTable
,则第二个值将覆盖第一个值通过 builder.table(...)
(变更日志语义)。但是,您可以通过将主题读取为记录流(即 builder.stream(...)
并应用聚合来计算 KTable
来轻松解决此问题。您的聚合将简单地为每个键发出所有值的列表.
关于删除:KTable
使用变更日志语义并理解删除键值对的逻辑删除消息。因此,如果您从主题中读取 KTable
并且该主题包含 <key:null>
消息,则 KTable
中具有此键的当前记录将被删除。当 KTable
是聚合的结果时,这更难实现,因为具有 null
键或 null
值的聚合输入记录将被忽略并且不会更新聚合结果。
解决方法是在聚合之前添加一个 map()
步骤并引入一个 NULL
值(即用户定义的 "object" 表示墓碑但不是 null
——在你的例子中,你可以称它为 null-category
)。在您的聚合中,如果输入记录具有 null-category
作为值,您只需 return 一个 null
值作为聚合结果。然后,这将转换为您的 KTable
的逻辑删除消息,并删除该键的当前类别列表。
编辑结束
当然,您始终可以通过处理器 API 构建自定义解决方案。但是,如果 DSL 可以满足您的需求,就没有理由这样做。
- Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.
这行得通。但通常人们会选择您列出的下一个选项,因为用于丰富输入流的辅助数据通常不是完全静态的;相反,它正在发生变化,但并不频繁:
- Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
这是通常的方法,我建议您坚持使用,除非您有特殊原因不这样做。
However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
所以我猜你也更喜欢第二种选择,但你担心这样是否有效。
简短的回答是:是的,KTable 将加载每个键的所有(最新)值。 table 将包含整个查找数据,但请记住 KTable 在幕后进行分区:例如,如果您的输入主题(对于 table)有 3
个分区,那么您最多可以 运行 您的应用程序的 3
个实例,每个实例都获得 table 的 1
分区(假设数据在分区之间均匀分布,那么每个 [ table 的 =105=] 将包含 table 的大约 1/3 的数据)。所以在实践中更有可能 "just works"。我在下面分享更多细节。
全局 KTables: 或者,您可以使用 global KTables 而不是(分区的)正常 table 变体。使用全局 tables,应用程序的每个实例都有 table 数据的完整副本。这使得全局 tables 对于加入场景非常有用,包括根据您的问题丰富 KStream。
Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.
这个你不用担心。简单地说,如果 table 的本地 "copy" 不可用,那么 Streams API 将自动确保从头开始完整读取 table 的数据。如果有可用的本地副本,那么您的应用程序将重新使用该副本(并在 table 的输入主题中有新数据可用时更新其本地副本)。
带有示例的更长答案
想象一下 KTable
的以下输入数据(想想:更新日志流),注意这个输入是如何由 6
消息组成的:
(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
这是由此输入产生的 "logical" KTable
的各种状态,其中每个新收到的输入消息(例如 (alice, 1)
)都会产生一个新的table 的状态:
Key Value
--------------
alice | 1 // (alice, 1) received
|
V
Key Value
--------------
alice | 1
bob | 40 // (bob, 40) received
|
V
Key Value
--------------
alice | 2 // (alice, 2) received
bob | 40
|
V
Key Value
--------------
alice | 2
bob | 40
charlie | 600 // (charlie, 600) received
|
V
Key Value
--------------
alice | 5 // (alice, 5) received
bob | 40
charlie | 600
|
V
Key Value
--------------
alice | 5
bob | 22 // (bob, 22) received
charlie | 600
你在这里可以看到,即使输入数据可能有很多很多消息(或如你所说的 "changes";这里,我们有 6
),[结果 KTable
中的 =106=](正在根据新收到的输入进行连续突变)是输入中唯一键的数量(此处:从 1
开始,逐渐增加到 3
),这通常明显少于消息的数量。因此,如果输入中的消息数为 N
,并且这些消息的唯一键数为 M
,则通常 M << N
(M
明显小于 N
; 另外,为了记录,我们有不变量 M <= N
).
这是 "this requires us to keep the KTable loaded with all the values" 通常不是问题的第一个原因,因为每个键只保留最新值。
第二个有用的原因是,正如 Matthias J. Sax 指出的那样,Kafka Streams 使用 RocksDB 作为此类 table 的默认存储引擎(更准确地说:状态存储支持 table). RocksDB 允许您维护 table 大于可用主内存 / Java 堆 space 的应用程序,因为它可以溢出到本地磁盘。
最后第三个原因是KTable
分区了。因此,如果 table 的输入主题(比如说)配置了 3
分区,那么幕后发生的事情是 KTable
本身在同样的方式。在上面的示例中,这是您最终可能得到的结果,尽管确切的 "splits" 取决于原始输入数据如何分布在 table 的输入主题的分区中:
逻辑 KTable(我上面显示的最后状态):
Key Value
--------------
alice | 5
bob | 22
charlie | 600
实际的 KTable,已分区(假设 table 的输入主题有 3
个分区,加上键=用户名均匀分布在各个分区中):
Key Value
--------------
alice | 5 // Assuming that all data for `alice` is in partition 1
Key Value
--------------
bob | 22 // ...for `bob` is in partition 2
Key Value
--------------
charlie | 600 // ...for `charlie` is in partition 3
实际上,输入数据的这种分区——除其他外——允许您"size" KTable 的实际表现。
另一个例子:
- 假设您的 KTable 的最新状态通常有 1 TB 的大小(同样,近似大小是 table 输入数据中唯一消息键的数量乘以关联消息值的平均大小)。
- 如果table的输入主题只有
1
个分区,那么KTable本身也只有1
个分区,大小为1TB。在这里,因为输入主题只有1
个分区,所以您可以 运行 您的应用程序最多1
个应用程序实例(所以并不是真的有很多并行性,呵呵)。 - 如果 table 的输入主题有
500
个分区,那么 KTable 也有500
个分区,每个分区大小约为 2 GB(假设数据均匀分布分布在分区中)。在这里,您可以 运行 您的应用程序最多500
个应用程序实例。如果您要 运行 恰好500
个实例,那么每个应用程序实例将恰好获得逻辑 KTable 的1
partition/shard,因此最终会得到 2 GB 的 table 数据;如果你要 运行 只有100
个实例,那么每个实例都会得到 table 的500 / 100 = 5
partitions/shards,最后大约2 GB * 5 = 10 GB
table数据。
从 2017 年 2 月发布的 Kafka 0.10.2.0 开始,GlobalKTable
概念可能是使用查找数据丰富流的更好选择。
https://docs.confluent.io/current/streams/concepts.html#globalktable