消费者获取数据 returns OFFSET_OUT_OF_RANGE

Consumer fetch data returns OFFSET_OUT_OF_RANGE

我有一个包含 3 个 kafka 代理的集群,有一个名为 fallback_topic 的主题 只有一个 consumerGroup 从这个主题消费并且这个 consumerGroup 中只有一个消费者

注入几条消息后,可以看到消息已经发布到Kafka。 LogSize 已被新消息移动;然而,Consumer Offset 保持不变,没有消息被消费。

下面是consumer.poll(3000)运行时的日志。分区 (4, 7, 10) 收到来自生产者的新消息,但是当消费者尝试读取它时,它报告 error=OFFSET_OUT_OF_RANGE

04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Node 654000 sent a full fetch response that created a new incremental fetch session 685508830 with 7 response partition(s)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 1062 for partition fallback_topic-1 returned fetch data (error=NONE, highWaterMark=1062, lastStableOffset = -1, logStartOffset = 1062, abortedTransactions = null, recordsSizeInBytes=0)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 124094 for partition fallback_topic-4 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 762 for partition fallback_topic-7 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
04:20:41.311 [kafka-coordinator-heartbeat-thread | uniqueConsumerGroup] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=uniqueConsumerGroup] Fetch READ_UNCOMMITTED at offset 897 for partition fallback_topic-10 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)

我的理解是分区的leader改变了偏移量,而follower没有,就是这个错误发生的时候。但是没有代理中断,所以消费者一直在使用同一个领导者。谁能帮我解释为什么会出现 OFFSET_OUT_OF_RANGE 错误。非常感谢你。下面是我的代码,我跳过了 consumer.commitAsync() 因为我的问题发生在提交之前。

        List<Event> events = new ArrayList<Event>();
        consumer.subscribe(Arrays.asList("fallback_topic"));
        ConsumerRecords<String, byte[]> records;
        
        do {
            logger.info("Start polling messages from " + topic);
            records = consumer.poll(3000);

            logger.info("done polling.");
            records.partitions().forEach(tp -> logger.info("found records from "+tp.topic()+"-"+tp.partition()));
            for (ConsumerRecord<String, byte[]> record : records) {
                Event event = EventKafkaSerializer.serializer.deserializeEvent(new ByteArrayInputStream(record.value()));
                logger.info(event.getId()+" "+event.getData().toString());
                events.add(event);
            }
           
        } while(records.count()>0);
        
        logger.info("Found total events "+events.size());

找到原因了。

最后忘记了运行consumer.close()