Logstash kafka 输入插件无法读取新消费者的任何消息,也无法将 auto_offset_reset 设置为最早

Logstash kafka input plugin unable to read any messages with new consumer and by setting auto_offset_reset to earliest

我正在使用 Logstash Kafka 输入插件从主题中读取消息。我早些时候能够启动新的消费者-属于新的消费者群体,并且通过设置 auto_offset_reset=earliest 能够从主题开始消费消息。

插件配置:

input {     
    kafka {         
    bootstrap_servers => "localhost:9092"
        topics => ["test_topic"]
        group_id => "new_consumer"
        client_id => "new_consumer"
        consumer_threads => 1
        auto_offset_reset => "earliest"   
  } 
}

但现在我注意到一个奇怪的行为。尽管这是属于新消费者组的新消费者并且 auto_offset_reset 设置为 'earliest',但我无法消费任何消息。

启用的调试日志如下是行为: 它清楚地表明消费者没有以前的偏移量,突然获取了分区偏移量,消费者使用它并设置了它的新偏移量(请注意:之前从主题中读取了 36387 条消息,因此下面的日志中有数字)

[2016-12-22T16:45:13,454][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group new_consumer with generation 1

[2016-12-22T16:45:13,455][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [test_topic-0] for group new_consumer

[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer fetching committed offsets for partitions: [test_topic-0]

[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer has no committed offset for partition test_topic-0

[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Resetting offset for partition test_topic-0 to earliest offset.

[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient] Initiating connection to node 0 at localhost:9092.

[2016-12-22T16:45:13,657][DEBUG][logstash.instrument.collector] Collector: Sending snapshot to observers {:created_at=>2016-12-22 16:45:13 -0800}

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-sent

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-received

[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.latency

[2016-12-22T16:45:13,742][DEBUG][org.apache.kafka.clients.NetworkClient] Completed connection to node 0

[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Fetched offset 36387 for partition test_topic-0

[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0

[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0

谁能告诉我为什么我们会看到这种行为?

是否可以根据配置的保留期限删除旧消息?可能偏移量 36387 是最早的偏移量,所有更早的消息都已过期。默认保留期为 7 天。