从重新平衡接口 (PartitionAssignor) 的分配方法中为消费者组创建一个新的消费者
Create a new consumer for a consumer group from within the assign method of the rebalancing interface (PartitionAssignor )
我是 运行 Kubernetes 上的 Kafka 集群。我正在实施自定义 PartitionAssignor
来个性化主题分区分配给消费者组中现有消费者的方式。为此,我重写了方法 Map<String, Assignment> assign( Cluster metadata Map<String, Subscription> subscriptions)
如果在 assign
方法中我通过 Kubernetes 客户端 API 动态创建了一个新的消费者,那么在这种情况下再平衡协议将如何表现。准确地说,当新创建的consumer向group coordinator发送joinGroup请求时(rebalancing过程还在进行中),当前正在进行的rebalancing是否完成,然后触发新的rebalance过程来容纳新创建的consumer?
对于那些可能对答案感兴趣的人,我将问题发布在 Kafka 邮件列表中,以下是 Confluent People 的回答:
答案:
如果您在 assign 方法中创建一个新的消费者(并假设您实际上
开始用它轮询,以便它发送 JoinGroup 请求),然后是的,它
需要一个新的再平衡来适应这个消费者。小组协调员将
通知所有现有成员重新加入该组,以便重新平衡可以继续进行
当前组的最新视图。
我是 运行 Kubernetes 上的 Kafka 集群。我正在实施自定义 PartitionAssignor
来个性化主题分区分配给消费者组中现有消费者的方式。为此,我重写了方法 Map<String, Assignment> assign( Cluster metadata Map<String, Subscription> subscriptions)
如果在 assign
方法中我通过 Kubernetes 客户端 API 动态创建了一个新的消费者,那么在这种情况下再平衡协议将如何表现。准确地说,当新创建的consumer向group coordinator发送joinGroup请求时(rebalancing过程还在进行中),当前正在进行的rebalancing是否完成,然后触发新的rebalance过程来容纳新创建的consumer?
对于那些可能对答案感兴趣的人,我将问题发布在 Kafka 邮件列表中,以下是 Confluent People 的回答:
答案:
如果您在 assign 方法中创建一个新的消费者(并假设您实际上 开始用它轮询,以便它发送 JoinGroup 请求),然后是的,它 需要一个新的再平衡来适应这个消费者。小组协调员将 通知所有现有成员重新加入该组,以便重新平衡可以继续进行 当前组的最新视图。