Spring kafka 消费记录有一些延迟
Spring kafka consume records with some delay
我在我的应用程序中使用 spring kafka。我想添加一些 15 分钟的延迟来消耗其中一个侦听器 - kafkaRetryListenerContainerFactory 的记录。我有两个听众。以下是我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(retryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
Kafka 重试监听器:
@KafkaListener(topics = "${spring.kafka.retry.topic}", groupId = "${spring.kafka.consumer-group-id}",
containerFactory = "kafkaRetryListenerContainerFactory", id = "retry.id")
public void retryMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
Thread.sleep(900000);
LOG.info(String.format("Consumed retry message -> %s", record.toString()));
acknowledgment.acknowledge();
}
当我添加 Thread.sleep() 时,我在日志中收到持续的重新平衡错误
Attempt to heartbeat failed since group is rebalancing
我的springkafka版本是2.3.4
以下是配置值:
max.poll.interval.ms = 1200000(高于thread.sleep)
heartbeat.interval.ms = 3000
session.timeout.ms = 10000
我试过了ack.nack(900000);仍然出现重新平衡错误
任何帮助将不胜感激
过滤器不是正确的方法;您需要 Thread.sleep()
线程并确保 max.poll.interval.ms
大于轮询收到的记录的总睡眠和处理时间。
在 2.3 中,容器可以选择在轮询之间休眠;对于早期版本,您必须自己睡眠。
编辑
我刚刚在我的 server.properties
中发现了这个(Mac OS 上的自制程序):
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
这解释了为什么我们看到分区最初分配给第一个消费者(见下面的评论)。
将其设置回默认值 3000 对我来说很有效。
我在我的应用程序中使用 spring kafka。我想添加一些 15 分钟的延迟来消耗其中一个侦听器 - kafkaRetryListenerContainerFactory 的记录。我有两个听众。以下是我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(retryConsumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
Kafka 重试监听器:
@KafkaListener(topics = "${spring.kafka.retry.topic}", groupId = "${spring.kafka.consumer-group-id}",
containerFactory = "kafkaRetryListenerContainerFactory", id = "retry.id")
public void retryMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
Thread.sleep(900000);
LOG.info(String.format("Consumed retry message -> %s", record.toString()));
acknowledgment.acknowledge();
}
当我添加 Thread.sleep() 时,我在日志中收到持续的重新平衡错误
Attempt to heartbeat failed since group is rebalancing
我的springkafka版本是2.3.4
以下是配置值:
max.poll.interval.ms = 1200000(高于thread.sleep)
heartbeat.interval.ms = 3000
session.timeout.ms = 10000
我试过了ack.nack(900000);仍然出现重新平衡错误
任何帮助将不胜感激
过滤器不是正确的方法;您需要 Thread.sleep()
线程并确保 max.poll.interval.ms
大于轮询收到的记录的总睡眠和处理时间。
在 2.3 中,容器可以选择在轮询之间休眠;对于早期版本,您必须自己睡眠。
编辑
我刚刚在我的 server.properties
中发现了这个(Mac OS 上的自制程序):
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
这解释了为什么我们看到分区最初分配给第一个消费者(见下面的评论)。
将其设置回默认值 3000 对我来说很有效。