Kafka Consumer 没有获取所有消息
Kafka Consumer not fetching all messages
每当在 Kafka 中创建新主题时,我都尝试启动动态消费者,但动态启动的消费者始终缺少 starting/first 消息,但从那里开始消费消息。我正在使用 kafka-python 模块并使用更新的 KafkaConsumer 和 KafkaProducer。
生产者代码是
producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
消费者代码为
consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
请提出一些建议来解决这个问题或我必须包含在我的生产者和消费者实例中的任何配置。
你能把auto_offset_reset设为最早吗
创建新的消费者流时,它从最新的偏移量(这是 auto_offset_reset 的默认值)开始,您将错过消费者未启动时发送的消息。
您可以在 kafka python doc 中阅读相关信息。相关部分如下
auto_offset_reset (str) – A policy for resetting offsets on
OffsetOutOfRange errors: ‘earliest’ will move to the oldest available
message, ‘latest’ will move to the most recent. Any ofther value will
raise the exception. Default: ‘latest’.
每当在 Kafka 中创建新主题时,我都尝试启动动态消费者,但动态启动的消费者始终缺少 starting/first 消息,但从那里开始消费消息。我正在使用 kafka-python 模块并使用更新的 KafkaConsumer 和 KafkaProducer。
生产者代码是
producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
消费者代码为
consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
请提出一些建议来解决这个问题或我必须包含在我的生产者和消费者实例中的任何配置。
你能把auto_offset_reset设为最早吗
创建新的消费者流时,它从最新的偏移量(这是 auto_offset_reset 的默认值)开始,您将错过消费者未启动时发送的消息。
您可以在 kafka python doc 中阅读相关信息。相关部分如下
auto_offset_reset (str) – A policy for resetting offsets on OffsetOutOfRange errors: ‘earliest’ will move to the oldest available message, ‘latest’ will move to the most recent. Any ofther value will raise the exception. Default: ‘latest’.