Spring kafka 消费记录有一些延迟

Spring kafka consume records with some delay

我在我的应用程序中使用 spring kafka。我想添加一些 15 分钟的延迟来消耗其中一个侦听器 - kafkaRetryListenerContainerFactory 的记录。我有两个听众。以下是我的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaRetryListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(retryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);       
    return factory;
}

Kafka 重试监听器:

@KafkaListener(topics = "${spring.kafka.retry.topic}", groupId = "${spring.kafka.consumer-group-id}", 
        containerFactory = "kafkaRetryListenerContainerFactory", id = "retry.id")
public void retryMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
    Thread.sleep(900000);
    LOG.info(String.format("Consumed retry message -> %s", record.toString()));
    acknowledgment.acknowledge();
}

当我添加 Thread.sleep() 时,我在日志中收到持续的重新平衡错误

Attempt to heartbeat failed since group is rebalancing

我的springkafka版本是2.3.4

以下是配置值:

max.poll.interval.ms = 1200000(高于thread.sleep)

heartbeat.interval.ms = 3000

session.timeout.ms = 10000

我试过了ack.nack(900000);仍然出现重新平衡错误

任何帮助将不胜感激

过滤器不是正确的方法;您需要 Thread.sleep() 线程并确保 max.poll.interval.ms 大于轮询收到的记录的总睡眠和处理时间。

在 2.3 中,容器可以选择在轮询之间休眠;对于早期版本,您必须自己睡眠。

编辑

我刚刚在我的 server.properties 中发现了这个(Mac OS 上的自制程序):

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

这解释了为什么我们看到分区最初分配给第一个消费者(见下面的评论)。

将其设置回默认值 3000 对我来说很有效。