kafka consumer seek is not working: AssertionError: Unassigned partition
kafka consumer seek is not working: AssertionError: Unassigned partition
当我尝试从我的主题接收消息时,下面定义的 kafka 消费者 con
工作得很好;但是,当我尝试使用 seek
方法或其任何变体更改偏移量时,这给我带来了麻烦。即 seek_to_beginning
、seek_to_end
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)
它生成以下输出和以下错误:
*** tp: TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
'Unassigned partition' 错误对我来说似乎无效,因为我只是从消费者本身获取分区和 TopicPartition,然后将其传回 seek 方法,因此它肯定已分配。
有什么想法吗?
通过更改分区分配给消费者的方式,我找到了一种查找主题分区的方法。问题似乎是消费者无法使用默认订阅的主题分区,所以我不得不更改构造函数以防止默认订阅主题,然后 显式分配一个分区给我的消费者.
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)
当我尝试从我的主题接收消息时,下面定义的 kafka 消费者 con
工作得很好;但是,当我尝试使用 seek
方法或其任何变体更改偏移量时,这给我带来了麻烦。即 seek_to_beginning
、seek_to_end
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)
它生成以下输出和以下错误:
*** tp: TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
'Unassigned partition' 错误对我来说似乎无效,因为我只是从消费者本身获取分区和 TopicPartition,然后将其传回 seek 方法,因此它肯定已分配。 有什么想法吗?
通过更改分区分配给消费者的方式,我找到了一种查找主题分区的方法。问题似乎是消费者无法使用默认订阅的主题分区,所以我不得不更改构造函数以防止默认订阅主题,然后 显式分配一个分区给我的消费者.
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)