如何在 Pykafka simpleconsumer 中 select 起始偏移量?
How to select starting offset in Pykafka simpleconsumer?
在我的 kafka 集群单分区主题中,我有一个简单的消费者处理所有传入的消息,如果处理的数据出现错误,我想以相同的顺序重新处理来自某个偏移量(不是开头)的所有消息,以修复不一致并保持来自 kafka 的消息的原始有序序列。
有没有办法在 Pykafka 中做到这一点?我还没弄清楚
您需要致电reset_offsets()
。例如:
consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
# because we passed in a consumer_group the new offsets will be saved in Kafka
consumer.reset_offsets(partition_offsets=partition_offset_pairs)
(其中 get_offset_for_partition()
是您定义的函数)。或者对于单分区主题:
# read from offset 123456
consumer = topic.get_simple_consumer()
partition = topic.partitions[0]
consumer.reset_offsets([(partition, 123456)])
同样的 reset_offsets()
方法也适用于 BalancedConsumer
& ManagedBalanceConsumer
类。
请注意,作为 Kafka 设计的一部分,消息只能独立保证每个主题分区的顺序。
在我的 kafka 集群单分区主题中,我有一个简单的消费者处理所有传入的消息,如果处理的数据出现错误,我想以相同的顺序重新处理来自某个偏移量(不是开头)的所有消息,以修复不一致并保持来自 kafka 的消息的原始有序序列。
有没有办法在 Pykafka 中做到这一点?我还没弄清楚
您需要致电reset_offsets()
。例如:
consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
# because we passed in a consumer_group the new offsets will be saved in Kafka
consumer.reset_offsets(partition_offsets=partition_offset_pairs)
(其中 get_offset_for_partition()
是您定义的函数)。或者对于单分区主题:
# read from offset 123456
consumer = topic.get_simple_consumer()
partition = topic.partitions[0]
consumer.reset_offsets([(partition, 123456)])
同样的 reset_offsets()
方法也适用于 BalancedConsumer
& ManagedBalanceConsumer
类。
请注意,作为 Kafka 设计的一部分,消息只能独立保证每个主题分区的顺序。