Spring Kafka 轮询 @KafkaListener 和侦听器 ack-mode 设置为记录
Spring Kafka polling with @KafkaListener and listener ack-mode set as record
我正在使用@KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来收听 3 个 kafka 主题,每个主题有 10 个分区。我对这是如何工作的几乎没有疑问。
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(30);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
我的 listener.ackmode 是 return 并且 enable.auto.commit 设置为 false 并且 partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
1) 我对并发的理解是,因为我将并发(在工厂级别)设置为 30,并且我总共有 30 个分区(对于所有三个主题)要读取,每个线程都将被分配一个分区。我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发,它会有什么影响?
2) spring 调用 poll() 方法时,是否从所有三个主题进行轮询?
3) 由于我将 listener.ackmode 设置为 return,它是否会等到在单个 poll() 中 returned 的所有记录完成后再发出下一个民意调查()?另外,如果我的记录处理时间超过 max.poll.interval.ms 会怎样?假设在单个 poll() 调用中 return 编辑了 1-100 个偏移量,而我的代码在 max.poll.interval.ms 被命中之前只能处理 50 个,此时 spring 会发出另一个轮询因为它已经达到 max.poll.interval.ms?如果是这样,下一个 poll() return 会从偏移量 51 开始记录吗?
非常感谢您的时间和帮助
my listener.ackmode is return
没有这样的ackmode;因为你没有在工厂设置它,所以你的实际 ack 模式是 BATCH (默认)。要使用 ack 模式记录(如果那是你的意思),你必须配置工厂容器属性。
my understanding about concurrency is ...
您的理解有误;并发度不能大于分区数分区最多的topic(如果一个listener监听多个topic)。由于每个主题只有 10 个分区,因此您的实际并发数为 10。
覆盖监听器上的 concurrency
只是覆盖出厂设置;您总是至少需要与并发数一样多的分区。
When spring call the poll() method, does it poll from all three topics?
不是那种配置;你有 3 个并发容器,每个容器有 30 个消费者在听一个主题。您有 90 个消费者。
如果所有 3 个主题只有一个侦听器,则投票将 return 来自所有 3 个的记录;但是您仍然可能有 20 个空闲消费者,具体取决于分区分配器如何分配分区 - 请参阅日志 "partitions assigned" 以了解分区的确切分配方式。循环分配器应该可以分配它们。
will spring issue another poll at this time
Spring 无法控制 - 如果您花费的时间太长,Consumer 线程在侦听器中 - Consumer 不是线程安全的,因此我们无法发出异步轮询。
您必须在max.poll.interval.ms
内处理max.poll.records
以避免Kafka重新平衡分区。
ack模式没有区别;这一切都是为了及时处理投票结果。
我正在使用@KafkaListener 和 ConcurrentKafkaListenerContainerFactory 来收听 3 个 kafka 主题,每个主题有 10 个分区。我对这是如何工作的几乎没有疑问。
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(30);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
我的 listener.ackmode 是 return 并且 enable.auto.commit 设置为 false 并且 partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
1) 我对并发的理解是,因为我将并发(在工厂级别)设置为 30,并且我总共有 30 个分区(对于所有三个主题)要读取,每个线程都将被分配一个分区。我的理解正确吗?如果我在 @KafkaListener 注释中再次覆盖并发,它会有什么影响?
2) spring 调用 poll() 方法时,是否从所有三个主题进行轮询?
3) 由于我将 listener.ackmode 设置为 return,它是否会等到在单个 poll() 中 returned 的所有记录完成后再发出下一个民意调查()?另外,如果我的记录处理时间超过 max.poll.interval.ms 会怎样?假设在单个 poll() 调用中 return 编辑了 1-100 个偏移量,而我的代码在 max.poll.interval.ms 被命中之前只能处理 50 个,此时 spring 会发出另一个轮询因为它已经达到 max.poll.interval.ms?如果是这样,下一个 poll() return 会从偏移量 51 开始记录吗?
非常感谢您的时间和帮助
my listener.ackmode is return
没有这样的ackmode;因为你没有在工厂设置它,所以你的实际 ack 模式是 BATCH (默认)。要使用 ack 模式记录(如果那是你的意思),你必须配置工厂容器属性。
my understanding about concurrency is ...
您的理解有误;并发度不能大于分区数分区最多的topic(如果一个listener监听多个topic)。由于每个主题只有 10 个分区,因此您的实际并发数为 10。
覆盖监听器上的 concurrency
只是覆盖出厂设置;您总是至少需要与并发数一样多的分区。
When spring call the poll() method, does it poll from all three topics?
不是那种配置;你有 3 个并发容器,每个容器有 30 个消费者在听一个主题。您有 90 个消费者。
如果所有 3 个主题只有一个侦听器,则投票将 return 来自所有 3 个的记录;但是您仍然可能有 20 个空闲消费者,具体取决于分区分配器如何分配分区 - 请参阅日志 "partitions assigned" 以了解分区的确切分配方式。循环分配器应该可以分配它们。
will spring issue another poll at this time
Spring 无法控制 - 如果您花费的时间太长,Consumer 线程在侦听器中 - Consumer 不是线程安全的,因此我们无法发出异步轮询。
您必须在max.poll.interval.ms
内处理max.poll.records
以避免Kafka重新平衡分区。
ack模式没有区别;这一切都是为了及时处理投票结果。