TypeError: partitions must be TopicPartition namedtuples
TypeError: partitions must be TopicPartition namedtuples
我想使用 kafka-python
中的 KafkaConsumer
来消费主题中的前 N 条消息:
from kafka import KafkaConsumer as kc
import json
bootstrap_servers = ['xx.xxx.xx.xxx:9092']
topic_name = 'my_topic_name'
consumer = kc(topic_name, group_id='group1', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', auto_commit_interval_ms=1000,
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
count = 0
consumer.seek_to_beginning((topic_name,0))
kjson = []
for msg in consumer:
if count < 10:
count = count + 1
kjson.append(msg.value)
else:
print(json.dumps(kjson, indent=4))
break
这一行 consumer.seek_to_beginning((topic_name,0))
给出了上述错误。 documentation 指定:
seek_to_beginning(*partitions)[source]
Seek to the oldest available offset for partitions.
Parameters: *partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions.
Raises: AssertionError – If any partition is not currently assigned, or if no partitions are assigned.
本主题有 32 个分区(索引从 0
到 31
)。从头开始的正确语法是什么?
如其所说
must be TopicPartition namedtuples
例如
from kafka.structs import TopicPartition
...
consumer.seek_to_beginning(TopicPartition(topic_name,0))
There are 32 partitions in this topic (indexed from 0 to 31)
tps = [TopicPartition(topic_name, i) for i in range(32)]
consumer.seek_to_beginning(tps)
我想使用 kafka-python
中的 KafkaConsumer
来消费主题中的前 N 条消息:
from kafka import KafkaConsumer as kc
import json
bootstrap_servers = ['xx.xxx.xx.xxx:9092']
topic_name = 'my_topic_name'
consumer = kc(topic_name, group_id='group1', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', auto_commit_interval_ms=1000,
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
count = 0
consumer.seek_to_beginning((topic_name,0))
kjson = []
for msg in consumer:
if count < 10:
count = count + 1
kjson.append(msg.value)
else:
print(json.dumps(kjson, indent=4))
break
这一行 consumer.seek_to_beginning((topic_name,0))
给出了上述错误。 documentation 指定:
seek_to_beginning(*partitions)[source]
Seek to the oldest available offset for partitions.
Parameters: *partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions.
Raises: AssertionError – If any partition is not currently assigned, or if no partitions are assigned.
本主题有 32 个分区(索引从 0
到 31
)。从头开始的正确语法是什么?
如其所说
must be TopicPartition namedtuples
例如
from kafka.structs import TopicPartition
...
consumer.seek_to_beginning(TopicPartition(topic_name,0))
There are 32 partitions in this topic (indexed from 0 to 31)
tps = [TopicPartition(topic_name, i) for i in range(32)]
consumer.seek_to_beginning(tps)