AssertionError: Unassigned partition
AssertionError: Unassigned partition
我试图通过设置偏移量来使用主题中的数据,但出现断言错误 -
from kafka import KafkaConsumer
consumer = KafkaConsumer('foobar1',
bootstrap_servers=['localhost:9092'])
print 'process started'
print consumer.partitions_for_topic('foobar1')
print 'done'
consumer.seek(0,10)
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print 'process ended'
错误:-
Traceback (most recent call last):
File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module>
consumer.seek(0,10)
File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
在调用 seek 之前,您必须使用 TopicPartition 列表调用 consumer.assign()。
另请注意,seek 的第一个参数也是一个 TopicPartition。
参见 KafkaConsumer API
在我使用 Kafka 0.9
和 kafka-python
的情况下,分区分配发生在 for message in consumer
期间。因此,搜索操作应该在迭代之后。我通过以下代码重置了我的组的偏移量:
import kafka
ps = []
for i in xrange(topic_partition_number):
ps.append(kafka.TopicPartition(topic, i))
consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group)
for msg in consumer:
print msg
consumer.seek_to_beginning(*ps)
consumer.commit()
break
下面是解决问题的例子:
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)
参考:
我试图通过设置偏移量来使用主题中的数据,但出现断言错误 -
from kafka import KafkaConsumer
consumer = KafkaConsumer('foobar1',
bootstrap_servers=['localhost:9092'])
print 'process started'
print consumer.partitions_for_topic('foobar1')
print 'done'
consumer.seek(0,10)
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
print 'process ended'
错误:-
Traceback (most recent call last):
File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module>
consumer.seek(0,10)
File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
在调用 seek 之前,您必须使用 TopicPartition 列表调用 consumer.assign()。 另请注意,seek 的第一个参数也是一个 TopicPartition。 参见 KafkaConsumer API
在我使用 Kafka 0.9
和 kafka-python
的情况下,分区分配发生在 for message in consumer
期间。因此,搜索操作应该在迭代之后。我通过以下代码重置了我的组的偏移量:
import kafka
ps = []
for i in xrange(topic_partition_number):
ps.append(kafka.TopicPartition(topic, i))
consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group)
for msg in consumer:
print msg
consumer.seek_to_beginning(*ps)
consumer.commit()
break
下面是解决问题的例子:
from kafka import KafkaConsumer, TopicPartition
con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)
参考: