Kafka KTable——跨机器共享聚合
Kafka KTable - shared aggregation across machines
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。
假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例在单独的机器上 运行。
我将如何确保生成的聚合包含每个键的 所有 值? IE 我不希望每个工作实例都有一些值的子集。
这是 StateStore 的用途吗? Kafka 是自己管理这个还是我需要想出一个方法?
对于 worker 实例,我假设您指的是 Kafka Streams 应用程序实例,对吗? (因为 Kafka Streams 中没有 master/worker 模式——它是一个库而不是框架——我们不使用术语 "worker"。)
如果要按键共同定位数据,则需要按键对数据进行分区。因此,当数据从一开始就被写入主题时,您的数据将由您的外部生产者按键分区。或者您在 Kafka Streams 应用程序中明确设置一个新密钥(例如使用 selectKey()
或 map()
)并通过调用 through()
重新分发。
(在未来的版本中将不再需要显式调用 through()
,即 0.10.1
并且 Kafka Streams 将在必要时自动重新分发记录。)
如果要对messages/record进行分区,key一定不能是null
。您还可以通过生产者配置更改分区模式 partitioner.class
(参见 https://kafka.apache.org/documentation.html#producerconfigs)。
分区完全独立于 StateStores,即使 StateStores 通常用于分区数据之上。
How would I go about insuring that the resultant aggregations include all values for each key? IE I don't want each worker instance to have some subset of the values.
通常,Kafka Streams 确保同一键的所有值将由同一(且只有一个)流任务处理,这也意味着只有一个应用程序实例(您描述为 "worker instance")将处理该键的值。请注意,一个应用程序实例可能 运行 1+ 流任务,但这些任务是孤立的。
这种行为是通过数据的分区实现的,Kafka Streams确保一个分区总是由同一个流任务处理。 link 到 keys/values 的逻辑是,在 Kafka 和 Kafka Streams 中,一个密钥总是被发送到同一个分区(这里有一个陷阱,但我不确定是否有意义去进入此问题范围的详细信息),因此一个特定的分区 - 在可能的许多分区中 - 包含同一键的所有值。
在某些情况下,例如加入两个流 A
和 B
时,您必须确保聚合将对相同的键进行操作,以确保来自两个流的数据是共同的位于同一个流任务中——这又是关于确保相关的输入流分区并因此匹配键(分别来自 A
和 B
)在同一个流中可用任务。您在这里使用的典型方法是 selectKey()
。一旦完成,Kafka Streams 确保,为了连接两个流 A 和 B 以及创建连接的输出流,相同键的所有值将由相同的流任务处理,因此相同的应用程序实例。
示例:
- 流
A
具有键 userId
,值为 { georegion }
。
- 流
B
具有键 georegion
,值为 { continent, description }
。
仅当两个流使用相同的密钥时,加入两个流才有效(从 Kafka 0.10.0 开始)。在此示例中,这意味着您必须重新键入(并因此重新分区)流 A
,以便生成的键从 userId
更改为 georegion
。否则,从 Kafka 0.10 开始,您无法连接 A
和 B
,因为数据没有位于负责实际执行连接的流任务中。
在此示例中,您可以 re-key/re-partition 流式传输 A
通过:
// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")
// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))
只有在 Kafka 0.10.0 中才需要调用 through()
来实际触发重新分区,而更高版本的 Kafka 会自动为您执行这些操作(这个即将推出的功能已经完成并在 Kafka 中可用trunk
).
Is this something that a StateStore would be used for? Does Kafka manage this on its own or do I need to come up with a method?
一般来说,不会。上述行为是通过 分区 而不是通过状态存储实现的。
有时由于您为流定义的操作而涉及状态存储,这可能解释了您问这个问题的原因。例如,窗口操作需要管理状态,因此将在幕后创建状态存储。但是您的实际问题 - "insuring that the resultant aggregations include all values for each key" - 与状态存储无关,而是关于分区行为。
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。
假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例在单独的机器上 运行。
我将如何确保生成的聚合包含每个键的 所有 值? IE 我不希望每个工作实例都有一些值的子集。
这是 StateStore 的用途吗? Kafka 是自己管理这个还是我需要想出一个方法?
对于 worker 实例,我假设您指的是 Kafka Streams 应用程序实例,对吗? (因为 Kafka Streams 中没有 master/worker 模式——它是一个库而不是框架——我们不使用术语 "worker"。)
如果要按键共同定位数据,则需要按键对数据进行分区。因此,当数据从一开始就被写入主题时,您的数据将由您的外部生产者按键分区。或者您在 Kafka Streams 应用程序中明确设置一个新密钥(例如使用 selectKey()
或 map()
)并通过调用 through()
重新分发。
(在未来的版本中将不再需要显式调用 through()
,即 0.10.1
并且 Kafka Streams 将在必要时自动重新分发记录。)
如果要对messages/record进行分区,key一定不能是null
。您还可以通过生产者配置更改分区模式 partitioner.class
(参见 https://kafka.apache.org/documentation.html#producerconfigs)。
分区完全独立于 StateStores,即使 StateStores 通常用于分区数据之上。
How would I go about insuring that the resultant aggregations include all values for each key? IE I don't want each worker instance to have some subset of the values.
通常,Kafka Streams 确保同一键的所有值将由同一(且只有一个)流任务处理,这也意味着只有一个应用程序实例(您描述为 "worker instance")将处理该键的值。请注意,一个应用程序实例可能 运行 1+ 流任务,但这些任务是孤立的。
这种行为是通过数据的分区实现的,Kafka Streams确保一个分区总是由同一个流任务处理。 link 到 keys/values 的逻辑是,在 Kafka 和 Kafka Streams 中,一个密钥总是被发送到同一个分区(这里有一个陷阱,但我不确定是否有意义去进入此问题范围的详细信息),因此一个特定的分区 - 在可能的许多分区中 - 包含同一键的所有值。
在某些情况下,例如加入两个流 A
和 B
时,您必须确保聚合将对相同的键进行操作,以确保来自两个流的数据是共同的位于同一个流任务中——这又是关于确保相关的输入流分区并因此匹配键(分别来自 A
和 B
)在同一个流中可用任务。您在这里使用的典型方法是 selectKey()
。一旦完成,Kafka Streams 确保,为了连接两个流 A 和 B 以及创建连接的输出流,相同键的所有值将由相同的流任务处理,因此相同的应用程序实例。
示例:
- 流
A
具有键userId
,值为{ georegion }
。 - 流
B
具有键georegion
,值为{ continent, description }
。
仅当两个流使用相同的密钥时,加入两个流才有效(从 Kafka 0.10.0 开始)。在此示例中,这意味着您必须重新键入(并因此重新分区)流 A
,以便生成的键从 userId
更改为 georegion
。否则,从 Kafka 0.10 开始,您无法连接 A
和 B
,因为数据没有位于负责实际执行连接的流任务中。
在此示例中,您可以 re-key/re-partition 流式传输 A
通过:
// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")
// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))
只有在 Kafka 0.10.0 中才需要调用 through()
来实际触发重新分区,而更高版本的 Kafka 会自动为您执行这些操作(这个即将推出的功能已经完成并在 Kafka 中可用trunk
).
Is this something that a StateStore would be used for? Does Kafka manage this on its own or do I need to come up with a method?
一般来说,不会。上述行为是通过 分区 而不是通过状态存储实现的。
有时由于您为流定义的操作而涉及状态存储,这可能解释了您问这个问题的原因。例如,窗口操作需要管理状态,因此将在幕后创建状态存储。但是您的实际问题 - "insuring that the resultant aggregations include all values for each key" - 与状态存储无关,而是关于分区行为。