Kafka - 并非所有消费者都会收到订阅的消息
Kafka - not all consumers receive subscribed message
为了使用 Kafka 一般发布消息,我使用 class 名称作为主题:
kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));
并且消费者订阅了他们感兴趣的 classes:
for(Object sub:_subscriptions)
topics.add(sub.getClass().getName());
_kafkaConsumer.subscribe(topics);
问题是,只有一个消费者收到了订阅的消息。我的理解是kafka会为每个订阅者分配一个唯一的分区(如果有的话)。我目前只有 2 个订阅者,我的 kafka server.properties 指定了 4 个分区。似乎所有消费者都在从同一个分区读取数据。由于这个明显的限制,Kafka 对于服务总线来说可能不是一个好的选择。任何帮助将不胜感激!
Kafka 消费者属性:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
properties.put("group.id", "TestGroup");
properties.put("auto.offset.reset","earliest");
Kafka 生产者属性:
properties.put("bootstrap.servers",_settings.getEndpoint());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
服务器属性(我对默认属性所做的唯一更改):
num.partitions=4
注意:我也试过消费者设置为:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.commit.interval.ms","1000");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "testGroup");
properties.put("auto.offset.reset","latest");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
Kafka默认使用RangeAssignor作为分区分配策略,具有以下特点:
The range assignor works on a per-topic basis. For each topic, we lay
out the available partitions in numeric order and the consumers in
lexicographic order. We then divide the number of partitions by the
total number of consumers to determine the number of partitions to
assign to each consumer. If it does not evenly divide, then the first
few consumers will have one extra partition. For example, suppose
there are two consumers C0 and C1, two topics t0 and t1, and each
topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2,
t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p1, t1p0,
t1p1] C1: [t0p2, t1p2]
如果你想对少量分区进行更均匀的分配,你可以使用
RoundRobinAssignor 通过设置 partition.assignment.strategy
如果您的所有消费者都拥有相同的消费者组 (group.id
属性),那么该组中只有一个消费者会收到消息。如果你想让所有的消费者都收到消息,他们需要有不同的 group.id
.
查看主题分区绑定了哪些消费者,可以使用如下命令
./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe
为了使用 Kafka 一般发布消息,我使用 class 名称作为主题:
kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));
并且消费者订阅了他们感兴趣的 classes:
for(Object sub:_subscriptions)
topics.add(sub.getClass().getName());
_kafkaConsumer.subscribe(topics);
问题是,只有一个消费者收到了订阅的消息。我的理解是kafka会为每个订阅者分配一个唯一的分区(如果有的话)。我目前只有 2 个订阅者,我的 kafka server.properties 指定了 4 个分区。似乎所有消费者都在从同一个分区读取数据。由于这个明显的限制,Kafka 对于服务总线来说可能不是一个好的选择。任何帮助将不胜感激!
Kafka 消费者属性:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
properties.put("group.id", "TestGroup");
properties.put("auto.offset.reset","earliest");
Kafka 生产者属性:
properties.put("bootstrap.servers",_settings.getEndpoint());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
服务器属性(我对默认属性所做的唯一更改):
num.partitions=4
注意:我也试过消费者设置为:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.commit.interval.ms","1000");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "testGroup");
properties.put("auto.offset.reset","latest");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
Kafka默认使用RangeAssignor作为分区分配策略,具有以下特点:
The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumers in lexicographic order. We then divide the number of partitions by the total number of consumers to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. The assignment will be: C0: [t0p0, t0p1, t1p0, t1p1] C1: [t0p2, t1p2]
如果你想对少量分区进行更均匀的分配,你可以使用
RoundRobinAssignor 通过设置 partition.assignment.strategy
如果您的所有消费者都拥有相同的消费者组 (group.id
属性),那么该组中只有一个消费者会收到消息。如果你想让所有的消费者都收到消息,他们需要有不同的 group.id
.
查看主题分区绑定了哪些消费者,可以使用如下命令
./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe