kafka 0.90 consumer persist group between runs

kafka 0.90 consumer persist group between runs

我构建了以下kafka消费者:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
this.kconsumer = new KafkaConsumer(props);

我希望消费者在该组启动时从最早开始。所以我第一次 运行 它,它按预期完美运行。只要订阅存在并且连接未关闭,它就会继续增加偏移量。

当我登录到 kafka 并 运行 以下内容时:

./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --new-consumer --group TEST1 --describe

我确切地看到了预期的结果,偏移量增加等。但是当连接关闭时 运行ning 相同的命令导致 "Consumer group TEST1 does not exist or is rebalancing." 只是它没有重新平衡,它消失了.

当消费者不在 运行ning 时,如何保持组的存在?我是否缺少消费者或 kafka 中的配置?

另外请注意,当我将 OFFSET 参数更改为 "latest" 时,我根本得不到任何记录,除非加载新记录,即使记录没有过期也是如此。

所以最重要的是,我想要做的是启动一个具有给定名称的新消费者,能够从最早的可用记录中提取,关闭那个消费者,如果我用那个开始一个消费者再次从我离开的地方拉出名字。对我所缺少的有什么想法吗?还是我只是误解了高级消费者的工作原理?

以防有人遇到这个问题并想知道我做了什么。在确定该组是否首先存在后,我能够设置偏移量。这样做意味着如果该组存在则使用 "latest"。如果不是,请使用 "earliest".

    private void buildConsumer(String offset)
    {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        this.kconsumer = new KafkaConsumer(props);
    }

    /*
    Check if the group exists before polling.
    If it does, leave with default offset.
    If it does not exists, set the offset to earliest to ensure you are getting all the records
    */
    private void groupExists(String topic)
    {
        TopicPartition toc = new TopicPartition(topic, 0);
        OffsetAndMetadata oam = kconsumer.committed(toc);
        if(oam != null){
            //do nothing, all is well, start from last commit
        } else {
            /*
            when a new group is started the AUTO_OFFSET_RESET_CONFIG
            needs to be set to earliest to ensure all records are picked up
            Since that property can only be set at instantiation the consumer
            must be rebuilt and resubscribed
            */
            buildConsumer("earliest");
            this.kconsumer.subscribe(Arrays.asList(topic));
        }
    }