KafkaStreams 中的本地状态存储
Local State Storage in KafkaStreams
我有一个非常简单的 KafkaStreams 应用程序。它看起来像这样:
input topic --> extract smth., update aggregate in the local state -> output topic
一开始输入的主题只有1个分区,一切都很顺利。
但是在我增加输入主题中的分区数量后,我发现我的应用程序是按分区实例化的,因此我的输出主题有多个更新(每个分区 1 个更新)而不是单个更新。
我该如何处理这种情况?我希望我的应用只生成所有输入分区的一个聚合。
Kafka Streams 按分区并行化,因此,您只能通过使用单个分区主题来执行此操作。
如果您无法控制输入主题的主题数量,您可以创建一个具有单个分区的中间主题,并通过该分区路由所有数据。
KStream multiPartitionInputStream = ...
multiPartitionInputStream.through("single-partitioned-topic")...
Note: doing a global aggregation does not scale horizontally, thus, this pattern should be used with care.
更新:
对于处理器 API 用户,您还可以通过配置 "partition.grouper"
提供自定义 PartitionGrouper
,创建单个任务并为该任务分配 both/all 个分区。
Note: providing a custom PartitionGrouper for DSL usage is highly discouraged because one needs to understand many internal details and assumptions to write a correct PartitionGrouper
.
我有一个非常简单的 KafkaStreams 应用程序。它看起来像这样:
input topic --> extract smth., update aggregate in the local state -> output topic
一开始输入的主题只有1个分区,一切都很顺利。
但是在我增加输入主题中的分区数量后,我发现我的应用程序是按分区实例化的,因此我的输出主题有多个更新(每个分区 1 个更新)而不是单个更新。
我该如何处理这种情况?我希望我的应用只生成所有输入分区的一个聚合。
Kafka Streams 按分区并行化,因此,您只能通过使用单个分区主题来执行此操作。
如果您无法控制输入主题的主题数量,您可以创建一个具有单个分区的中间主题,并通过该分区路由所有数据。
KStream multiPartitionInputStream = ...
multiPartitionInputStream.through("single-partitioned-topic")...
Note: doing a global aggregation does not scale horizontally, thus, this pattern should be used with care.
更新:
对于处理器 API 用户,您还可以通过配置 "partition.grouper"
提供自定义 PartitionGrouper
,创建单个任务并为该任务分配 both/all 个分区。
Note: providing a custom PartitionGrouper for DSL usage is highly discouraged because one needs to understand many internal details and assumptions to write a correct
PartitionGrouper
.