在 Kafka 的消费者组内重置 kafka LAG(更改偏移量)-python
Reset kafka LAG (change offset) within consumer group in Kafka-python
我在使用 kafka-consumer-groups.sh 工具 but I am needing to reset it within the application. I found this example, but it doesn't seem to reset it. 示例
重置我的 LAG 的地方找到了这个
consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
enable_auto_commit=False,
group_id="MyTopic.group")
consumer.poll()
consumer.seek_to_end()
consumer.commit()
... continue on with other code...
运行 bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe
仍然显示两个分区都有 LAG。我怎样才能让当前偏移量到 "fast-foward" 到最后?
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyTopic 0 52110 66195 14085 kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1 kafka-python-1.4.2
MyTopic 1 52297 66565 14268 kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2 kafka-python-1.4.2
为了"fast forward"消费组的offset,也就是清除LAG,需要创建新的消费者加入同一个组。
控制台命令是:
kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>
同时,您可以 运行 命令来查看您描述的滞后,您会看到滞后已消除。
您可能需要这个:
def consumer_from_offset(topic, group_id, offset):
"""return the consumer from a certain offset"""
consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
tp = TopicPartition(topic=topic, partition=0)
consumer.assign([tp])
consumer.seek(tp, offset)
return consumer
consumer = consumer_from_offset('topic', 'group', 0)
for msg in consumer:
# it will consume the msg beginning from offset 0
print(msg)
我在使用 kafka-consumer-groups.sh 工具
consumer = KafkaConsumer("MyTopic", bootstrap_servers=self.kafka_server + ":" + str(self.kafka_port),
enable_auto_commit=False,
group_id="MyTopic.group")
consumer.poll()
consumer.seek_to_end()
consumer.commit()
... continue on with other code...
运行 bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group MyTopic.group --describe
仍然显示两个分区都有 LAG。我怎样才能让当前偏移量到 "fast-foward" 到最后?
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyTopic 0 52110 66195 14085 kafka-python-1.4.2-6afb6901-c651-4534-a482-15358db42c22 /Host1 kafka-python-1.4.2
MyTopic 1 52297 66565 14268 kafka-python-1.4.2-c70e0a71-7d61-46a1-97bc-aa2726a8109b /Host2 kafka-python-1.4.2
为了"fast forward"消费组的offset,也就是清除LAG,需要创建新的消费者加入同一个组。
控制台命令是:
kafka-console-consumer.sh --bootstrap-server <brokerIP>:9092 --topic <topicName> --consumer-property group.id=<groupName>
同时,您可以 运行 命令来查看您描述的滞后,您会看到滞后已消除。
您可能需要这个:
def consumer_from_offset(topic, group_id, offset):
"""return the consumer from a certain offset"""
consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id=group_id)
tp = TopicPartition(topic=topic, partition=0)
consumer.assign([tp])
consumer.seek(tp, offset)
return consumer
consumer = consumer_from_offset('topic', 'group', 0)
for msg in consumer:
# it will consume the msg beginning from offset 0
print(msg)