运行 没有指定分区的kafka消费者

Run kafka consumer without specifying partition

最近在学习Kafka,我的consumer消费不到任何记录,除非我指定--parititon 0参数。换句话说,我不能使用像这样的记录:

kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic 

但工作方式如下:

kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic --partition 0

主要问题是,当我转到 java 代码时,我的 KafkaConsumer class 无法获取记录,我需要知道如何指定 partition java KafkaConsumer 中的数字 ?!

我当前的 java 代码是:


public class ConsumerDemo {

    public static void main(String[] args) {

        Logger logger = LoggerFactory.getLogger((ConsumerDemo.class.getName()));

        String bootstrapServer = "127.0.0.10:9092";
        String groupId = "my-kafka-java-app";
        String topic = "first-topic";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        //properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // subscribe consumer to our topic
        consumer.subscribe(Collections.singleton(topic)); //means subscribtion to one topic

        // poll for new data
        while(true){
            //consumer.poll(100); old way
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                logger.info("Key: " + record.key() + ", Value: "+ record.value() );
                logger.info("Partition: " + record.partition() + ", Offset: "+ record.offset());
            }

        }

    }
}

顾名思义,ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG属性的目的是配置一个Partition Assignment Strategy而不是按照命令行的指示设置一个固定的分区。

使用的默认策略是可以更改的 RangeAssignor,例如 StickyAssignor,如下所示:

properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());

您可以阅读有关 Kafka Client Side Assignment Proposal 的更多信息。

你的做法是正确的。订阅主题时不需要指定分区。也许你的消费组已经消费了topic中的所有消息,并且提交了最新的offsets。

确保当您 运行 您的应用程序或创建一个新的消费者组从头开始使用时正在生成新消息(如果您将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为“最早”)

经过大量检查,我的解决方案是使用 consumer.assignconsumer.seek 而不是 consumer.subscribe 并且没有指定 groupId。但是我觉得应该有更优的方案

java 代码如下:

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // subscribe consumer to our topic
        //consumer.subscribe(Collections.singleton(topic)); //means subscription to one topic
        // using assign and Seek, are mostly used to replay data or fetch a specific msg
        TopicPartition  partitionToReadFrom = new TopicPartition(topic, 0);
        long offsetToReadFrom = 15L;
        // assign
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek: for a specific offset to read from
        consumer.seek(partitionToReadFrom, offsetToReadFrom);