Kafka消费者集群环境偏移量
Kafka Consumer Cluster Environment Offset
我正在尝试让 x 数量的消费者访问 kafka 中的指定主题但不使用相同的消息。我想要例如...
消费者 1 取货偏移量 1
消费者 2 取货偏移量 2
消费者 1 取货偏移量 3
消费者 2 取货偏移量 4
我希望 kafka 充当这两个消费者的队列。我注意到 group.id 配置,我假设您可以使用同一个组,它会相应地处理它,但它似乎不像我想象的那样工作。
这是我正在使用的代码...
public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaUrl);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("group.id", "group1");
props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("event1", "event2"));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
}
public void pollTopics() {
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
if(processor != null) {
kafkaThreadPool.execute(processor);
}
}
}catch (Exception e){
LOG.error("Polling exception occurred", e);
}
}
我希望能够在集群环境中运行此代码并让 kafka 成为队列。我想让它在拉取消息的同时去到下一个offset,那么下一个kafka poll会去抓取下一个offset。这可能吗?如果是这样,我做错了什么?
这在 Kafka 中是不可能的(以您描述的方式)。
如果使用消费者组,单个分区只能由单个消费者读取。因此,Kafka 确实按分区进行扩展,即,如果你想要多个消费者(读取不同的数据),你需要为每个消费者至少一个分区。如果你有比消费者更多的分区,一些(或所有)消费者会同时读取多个分区。
您的解决方案是,创建一个具有多个分区的主题(或使用多个主题并让您组的所有消费者订阅一个主题)。
我正在尝试让 x 数量的消费者访问 kafka 中的指定主题但不使用相同的消息。我想要例如...
消费者 1 取货偏移量 1 消费者 2 取货偏移量 2 消费者 1 取货偏移量 3 消费者 2 取货偏移量 4
我希望 kafka 充当这两个消费者的队列。我注意到 group.id 配置,我假设您可以使用同一个组,它会相应地处理它,但它似乎不像我想象的那样工作。
这是我正在使用的代码...
public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaUrl);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("group.id", "group1");
props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("event1", "event2"));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
}
public void pollTopics() {
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
if(processor != null) {
kafkaThreadPool.execute(processor);
}
}
}catch (Exception e){
LOG.error("Polling exception occurred", e);
}
}
我希望能够在集群环境中运行此代码并让 kafka 成为队列。我想让它在拉取消息的同时去到下一个offset,那么下一个kafka poll会去抓取下一个offset。这可能吗?如果是这样,我做错了什么?
这在 Kafka 中是不可能的(以您描述的方式)。
如果使用消费者组,单个分区只能由单个消费者读取。因此,Kafka 确实按分区进行扩展,即,如果你想要多个消费者(读取不同的数据),你需要为每个消费者至少一个分区。如果你有比消费者更多的分区,一些(或所有)消费者会同时读取多个分区。
您的解决方案是,创建一个具有多个分区的主题(或使用多个主题并让您组的所有消费者订阅一个主题)。