Kafka Streams 处理器分区重新分配行为
KafkaStreams processor partition re-assignment behavior
假设我有一个基本的 KafkaStreams 应用程序,其中包含一个主题(具有多个分区)和一种处理消息的处理器类型,如下所示:
builder.stream(topic)
.process(() -> new MyProcessor());
下面的情况会不会发生?对于 MyProcessor 的特定实例,说 M(即通过调用处理器供应商获得的特定 java 对象),对于主题的特定分区,说 P,
- 在某个时间 t1,M 收到来自 P[=41= 的消息]
- 稍后 t2,P 从 M 中撤销,因此 M 不再接收来自 P 的消息(例如,因为启动了一个处理 P 的额外工作程序)
- 稍后,t3,M 再次收到来自 P[ 的消息=41=]
我检查了 documentation 关于流任务如何与 Kafka 主题分区相关的内容,但我没有找到关于这如何与处理器实例的构造和删除相关的详细信息 and/or (un-)重新平衡时将主题分区分配给现有处理器。
在 Kafka Streams 中,“处理单元”称为 流任务。
任务可以是有状态的and/or无状态的。当重新平衡事件发生时,应用程序的一个实例(例如,M
)上的任务 运行 可能会移动到应用程序的另一个实例。
主题分区和流任务之间存在 1-1 映射,这保证了一个且只有一个任务将处理来自特定分区的数据。例如,如果任务 3 负责在分区 P
中读取和处理,那么当任务 3 从实例 M
移动到另一个实例 M'
时,那么 M
将停止读取 P
(因为它不再运行任务 3),并且 M'
(现在运行任务 3)将 resume/continue 处理 P
.
- at some time t1, M receives messages from P
假设负责处理主题分区 P
的流任务称为 task(P)
。在时间 t1,M
恰好是 运行 task(P)
的应用程序实例。这就是上面第 1 点的情况。
- at a later point t2, P is revoked from M so M does not receive messages from P anymore (e.g. because an extra worker was started which handles P)
此处,应用程序的另一个实例(您将此实例称为“额外工作人员”)负责 运行 task(P)
。此处,task(P)
将自动从原始应用程序实例 M
迁移到新实例 M'
。由 task(P)
管理的任何状态(例如,当任务正在执行诸如连接或聚合之类的有状态操作时)当然会与任务一起迁移。迁移 task(P)
时,读取和处理主题分区 P
的责任也将从应用程序实例 M
转移到 M'
。
也许不要在“哪个应用程序实例正在处理主题分区P
”方面考虑太多?相反,一个特定的分区总是由一个特定的流任务处理,并且流任务可以跨应用程序实例移动。 (当然,Kafka 的 Streams API 将防止不必要的任务迁移以确保您的应用程序处理保持高效。)
- at a later point, t3, M receives messages again from P
这意味着,在时间 t3,由于另一个重新平衡事件,M
再次被分配了任务 task(P)
——可能是因为另一个应用程序实例 M'
已关闭,或发生其他需要任务迁移的事情。
Asked in the comments to this answer: It would also be useful though to have a sentence or two about the state migration. It's not like the binary/physical data is taken from one RocksDB instance and passed over to another. Obviously, the state is re-built based on fault-tolerance mechanism.
有状态任务使用状态存储来持久化状态信息。这些状态存储是容错的。这些状态存储的真实来源是 Kafka 本身:对状态存储的任何更改(例如,递增计数器)都以流式方式备份到 Kafka——类似于将数据库的 CDC 流存储 table 到Kafka 主题(这些是普通主题,但通常称为 'changelog topics')。然后,当一个任务终止或迁移到另一个 container/VM/machine 时,通过从 Kafka 读取,任务的状态存储将在任务的新 container/VM/machine 中恢复(想想:streaming-backup / streaming-restore) .这会将状态存储恢复到它们在原始容器中的样子,没有任何数据丢失或重复。
流任务使用 RocksDB 在本地实现状态存储(如在任务的容器中)以进行优化。将这些本地 RocksDB 实例视为可以在数据安全方面丢失的缓存,因为如上所述,状态数据的持久存储是 Kafka。
假设我有一个基本的 KafkaStreams 应用程序,其中包含一个主题(具有多个分区)和一种处理消息的处理器类型,如下所示:
builder.stream(topic)
.process(() -> new MyProcessor());
下面的情况会不会发生?对于 MyProcessor 的特定实例,说 M(即通过调用处理器供应商获得的特定 java 对象),对于主题的特定分区,说 P,
- 在某个时间 t1,M 收到来自 P[=41= 的消息]
- 稍后 t2,P 从 M 中撤销,因此 M 不再接收来自 P 的消息(例如,因为启动了一个处理 P 的额外工作程序)
- 稍后,t3,M 再次收到来自 P[ 的消息=41=]
我检查了 documentation 关于流任务如何与 Kafka 主题分区相关的内容,但我没有找到关于这如何与处理器实例的构造和删除相关的详细信息 and/or (un-)重新平衡时将主题分区分配给现有处理器。
在 Kafka Streams 中,“处理单元”称为 流任务。
任务可以是有状态的and/or无状态的。当重新平衡事件发生时,应用程序的一个实例(例如,M
)上的任务 运行 可能会移动到应用程序的另一个实例。
主题分区和流任务之间存在 1-1 映射,这保证了一个且只有一个任务将处理来自特定分区的数据。例如,如果任务 3 负责在分区 P
中读取和处理,那么当任务 3 从实例 M
移动到另一个实例 M'
时,那么 M
将停止读取 P
(因为它不再运行任务 3),并且 M'
(现在运行任务 3)将 resume/continue 处理 P
.
- at some time t1, M receives messages from P
假设负责处理主题分区 P
的流任务称为 task(P)
。在时间 t1,M
恰好是 运行 task(P)
的应用程序实例。这就是上面第 1 点的情况。
- at a later point t2, P is revoked from M so M does not receive messages from P anymore (e.g. because an extra worker was started which handles P)
此处,应用程序的另一个实例(您将此实例称为“额外工作人员”)负责 运行 task(P)
。此处,task(P)
将自动从原始应用程序实例 M
迁移到新实例 M'
。由 task(P)
管理的任何状态(例如,当任务正在执行诸如连接或聚合之类的有状态操作时)当然会与任务一起迁移。迁移 task(P)
时,读取和处理主题分区 P
的责任也将从应用程序实例 M
转移到 M'
。
也许不要在“哪个应用程序实例正在处理主题分区P
”方面考虑太多?相反,一个特定的分区总是由一个特定的流任务处理,并且流任务可以跨应用程序实例移动。 (当然,Kafka 的 Streams API 将防止不必要的任务迁移以确保您的应用程序处理保持高效。)
- at a later point, t3, M receives messages again from P
这意味着,在时间 t3,由于另一个重新平衡事件,M
再次被分配了任务 task(P)
——可能是因为另一个应用程序实例 M'
已关闭,或发生其他需要任务迁移的事情。
Asked in the comments to this answer: It would also be useful though to have a sentence or two about the state migration. It's not like the binary/physical data is taken from one RocksDB instance and passed over to another. Obviously, the state is re-built based on fault-tolerance mechanism.
有状态任务使用状态存储来持久化状态信息。这些状态存储是容错的。这些状态存储的真实来源是 Kafka 本身:对状态存储的任何更改(例如,递增计数器)都以流式方式备份到 Kafka——类似于将数据库的 CDC 流存储 table 到Kafka 主题(这些是普通主题,但通常称为 'changelog topics')。然后,当一个任务终止或迁移到另一个 container/VM/machine 时,通过从 Kafka 读取,任务的状态存储将在任务的新 container/VM/machine 中恢复(想想:streaming-backup / streaming-restore) .这会将状态存储恢复到它们在原始容器中的样子,没有任何数据丢失或重复。
流任务使用 RocksDB 在本地实现状态存储(如在任务的容器中)以进行优化。将这些本地 RocksDB 实例视为可以在数据安全方面丢失的缓存,因为如上所述,状态数据的持久存储是 Kafka。