Kafka 自定义分区 - 具有自定义每个分区用户数据的消费者分配器
Kafka custom partition - consumers assignor with custom per partition user data
我正在 Kafka 中为 topics/partitions assignor 实现自定义消费者。为此,我重写了 AbstractPartitionAssignor
抽象 class,后者又实现了 ConsumerPartitionAssignor
接口。
作为自定义分配器的一部分,我想发送关于消费者订阅的每个主题的每个分区的单个(浮动)信息。
我知道我可以通过覆盖默认方法将自定义数据发送给转让人
ByteBuffer subscriptionUserData(Set<String> topics)
的 ConsumerPartitionAssignor
界面。
但是,问题是从上面的方法签名中,我无法为消费者注册的每个主题获取分配给带下划线的消费者的分区列表。
另一方面,我可以看到每个消费者发送给组协调器的订阅 class 都有每个消费者拥有的分区列表。
public static final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
.....
关于如何通过方法 ByteBuffer subscriptionUserData(Set<String> topics)
或使用仅依赖于 kafka 的任何其他方式向组协调器发送 自定义每个分区数据 的任何提示 public API。
谢谢。
要解决上述问题,只需使用Assignor
回调函数onAssignment
,如下所示
...
private List<TopicPartition> memberAssignment = null;
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
memberAssignment = assignment.partitions();
this.generation = metadata.generationId();
}
并在函数 subscriptionUserData(或分配器 class 中的任何位置)中使用 memberAssignment
来获取当前分配给每个消费者的分区列表。
public ByteBuffer subscriptionUserData(Set<String> topics)
我正在 Kafka 中为 topics/partitions assignor 实现自定义消费者。为此,我重写了 AbstractPartitionAssignor
抽象 class,后者又实现了 ConsumerPartitionAssignor
接口。
作为自定义分配器的一部分,我想发送关于消费者订阅的每个主题的每个分区的单个(浮动)信息。
我知道我可以通过覆盖默认方法将自定义数据发送给转让人
ByteBuffer subscriptionUserData(Set<String> topics)
的 ConsumerPartitionAssignor
界面。
但是,问题是从上面的方法签名中,我无法为消费者注册的每个主题获取分配给带下划线的消费者的分区列表。
另一方面,我可以看到每个消费者发送给组协调器的订阅 class 都有每个消费者拥有的分区列表。
public static final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
.....
关于如何通过方法 ByteBuffer subscriptionUserData(Set<String> topics)
或使用仅依赖于 kafka 的任何其他方式向组协调器发送 自定义每个分区数据 的任何提示 public API。
谢谢。
要解决上述问题,只需使用Assignor
回调函数onAssignment
,如下所示
...
private List<TopicPartition> memberAssignment = null;
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
memberAssignment = assignment.partitions();
this.generation = metadata.generationId();
}
并在函数 subscriptionUserData(或分配器 class 中的任何位置)中使用 memberAssignment
来获取当前分配给每个消费者的分区列表。
public ByteBuffer subscriptionUserData(Set<String> topics)