Kafka 在 Consumer.Poll(Duration.ZERO) 之后没有分配分区;

Kafka is not assigning a partition after Consumer.Poll(Duration.ZERO);


我开始了一个项目,我在其中实现了 appache kafka。
我已经有一个工作的生产者将数据写入队列。到目前为止,一切都很好。现在我想编写一个读取队列中所有数据的消费者。
即对应代码:
try {
    consumer.subscribe(Collections.singletonList("names"));
    if (startingPoint != null){
        consumer.
        consumer.poll(Duration.ofMillis(0));
        consumer.seekToBeginning(consumer.assignment());
    }
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            keyValuePairs.add(new String[]{record.key(),record.value()});
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

该代码现在无法正常工作。仅消耗新记录。 我能够发现 seekToBeginning() 不起作用,因为那一刻没有分区分配给消费者。 如果我增加投票的持续时间,它就会起作用。另一方面,如果我只是暂停线程,它不会。

有人可以解释一下为什么会这样吗?我试图自己找出答案,并且已经阅读了一些有关 Kafka 心跳的内容。但是我还没有完全理解到底发生了什么。

作业需要时间;轮询 0 通常意味着轮询将在它发生之前退出。

您应该向 subscribe() 方法添加一个 ConsumerRebalanceListener 回调并在 onPartitionsAssigned() 中执行查找。

编辑

@SpringBootApplication
public class So69121558Application {

    public static void main(String[] args) {
        SpringApplication.run(So69121558Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ConsumerFactory<String, String> cf, KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so69121558", "test");
            Consumer<String, String> consumer = cf.createConsumer("group", "");
            consumer.subscribe(Collections.singletonList("so69121558"), new ConsumerRebalanceListener() {

                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    consumer.seekToBeginning(partitions);
                }

            });
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
            records.forEach(System.out::println);
            Thread.sleep(5000);
            consumer.close();
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
    }

}

这里有几个以 Spring 方式进行操作的示例 - 只需将其中一个(或两个)添加到上面的 class.

@KafkaListener(id = "so69121558", topics = "so69121558")
void listen(ConsumerRecord<?, ?> rec) {
    System.out.println(rec);
}

@KafkaListener(id = "so69121558-1", topics = "so69121558")
void pojoListen(String in) {
    System.out.println(in);
}

查找的方式也有点不同;这是完整的示例:

@SpringBootApplication
public class So69121558Application extends AbstractConsumerSeekAware {

    public static void main(String[] args) {
        SpringApplication.run(So69121558Application.class, args);
    }

    @KafkaListener(id = "so69121558", topics = "so69121558")
    void listen(ConsumerRecord<?, ?> rec) {
        System.out.println(rec);
    }

    @KafkaListener(id = "so69121558-1", topics = "so69121558")
    void pojoListen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69121558").partitions(1).replicas(1).build();
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }


}