Logstash kafka 输入插件无法读取新消费者的任何消息,也无法将 auto_offset_reset 设置为最早
Logstash kafka input plugin unable to read any messages with new consumer and by setting auto_offset_reset to earliest
我正在使用 Logstash Kafka 输入插件从主题中读取消息。我早些时候能够启动新的消费者-属于新的消费者群体,并且通过设置 auto_offset_reset=earliest 能够从主题开始消费消息。
插件配置:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test_topic"]
group_id => "new_consumer"
client_id => "new_consumer"
consumer_threads => 1
auto_offset_reset => "earliest"
}
}
但现在我注意到一个奇怪的行为。尽管这是属于新消费者组的新消费者并且 auto_offset_reset 设置为 'earliest',但我无法消费任何消息。
启用的调试日志如下是行为:
它清楚地表明消费者没有以前的偏移量,突然获取了分区偏移量,消费者使用它并设置了它的新偏移量(请注意:之前从主题中读取了 36387 条消息,因此下面的日志中有数字)
[2016-12-22T16:45:13,454][INFO
][org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
Successfully joined group new_consumer with generation 1
[2016-12-22T16:45:13,455][INFO
][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
Setting newly assigned partitions [test_topic-0] for group
new_consumer
[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
Group new_consumer fetching committed offsets for partitions:
[test_topic-0]
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
Group new_consumer has no committed offset for partition test_topic-0
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher]
Resetting offset for partition test_topic-0 to earliest offset.
[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient]
Initiating connection to node 0 at localhost:9092.
[2016-12-22T16:45:13,657][DEBUG][logstash.instrument.collector]
Collector: Sending snapshot to observers {:created_at=>2016-12-22
16:45:13 -0800}
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics]
Added sensor with name node-0.bytes-sent
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics]
Added sensor with name node-0.bytes-received
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics]
Added sensor with name node-0.latency
[2016-12-22T16:45:13,742][DEBUG][org.apache.kafka.clients.NetworkClient]
Completed connection to node 0
[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher]
Fetched offset 36387 for partition test_topic-0
[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
Group newconsumer committed offset 36387 for partition test_topic-0
[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
Group newconsumer committed offset 36387 for partition test_topic-0
谁能告诉我为什么我们会看到这种行为?
是否可以根据配置的保留期限删除旧消息?可能偏移量 36387 是最早的偏移量,所有更早的消息都已过期。默认保留期为 7 天。
我正在使用 Logstash Kafka 输入插件从主题中读取消息。我早些时候能够启动新的消费者-属于新的消费者群体,并且通过设置 auto_offset_reset=earliest 能够从主题开始消费消息。
插件配置:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test_topic"]
group_id => "new_consumer"
client_id => "new_consumer"
consumer_threads => 1
auto_offset_reset => "earliest"
}
}
但现在我注意到一个奇怪的行为。尽管这是属于新消费者组的新消费者并且 auto_offset_reset 设置为 'earliest',但我无法消费任何消息。
启用的调试日志如下是行为: 它清楚地表明消费者没有以前的偏移量,突然获取了分区偏移量,消费者使用它并设置了它的新偏移量(请注意:之前从主题中读取了 36387 条消息,因此下面的日志中有数字)
[2016-12-22T16:45:13,454][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group new_consumer with generation 1
[2016-12-22T16:45:13,455][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [test_topic-0] for group new_consumer
[2016-12-22T16:45:13,456][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer fetching committed offsets for partitions: [test_topic-0]
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group new_consumer has no committed offset for partition test_topic-0
[2016-12-22T16:45:13,544][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Resetting offset for partition test_topic-0 to earliest offset.
[2016-12-22T16:45:13,546][DEBUG][org.apache.kafka.clients.NetworkClient] Initiating connection to node 0 at localhost:9092.
[2016-12-22T16:45:13,657][DEBUG][logstash.instrument.collector] Collector: Sending snapshot to observers {:created_at=>2016-12-22 16:45:13 -0800}
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-sent
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.bytes-received
[2016-12-22T16:45:13,741][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name node-0.latency
[2016-12-22T16:45:13,742][DEBUG][org.apache.kafka.clients.NetworkClient] Completed connection to node 0
[2016-12-22T16:45:13,901][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher] Fetched offset 36387 for partition test_topic-0
[2016-12-22T16:45:18,050][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0
[2016-12-22T16:45:18,563][DEBUG][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Group newconsumer committed offset 36387 for partition test_topic-0
谁能告诉我为什么我们会看到这种行为?
是否可以根据配置的保留期限删除旧消息?可能偏移量 36387 是最早的偏移量,所有更早的消息都已过期。默认保留期为 7 天。