设置消费者抵消
Set consumer offset
如果我想从开始偏移处获取所有消息,我 运行 这个 shell 命令:
/usr/share/kafka/bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092
--topic 1-codicefiscale-21032022122736-i
--partition 0
--offset 5
我怎样才能对 kafka-python 库做同样的事情?
def demoListMessages(topicName):
consumer = KafkaConsumer(topicName,
bootstrap_servers="localhost:9092",
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
demoListMessages("1-codicefiscale-21032022122736-i")
我必须设置分区。这个片段对我有用。
def demoListPageMessages(topicName):
consumer = KafkaConsumer(bootstrap_servers="localhost:9092",auto_offset_reset='earliest',consumer_timeout_ms=1000)
tp = TopicPartition(topicName, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
consumer.seek(tp, 5)
for msg in consumer:
print(msg.value)
如果我想从开始偏移处获取所有消息,我 运行 这个 shell 命令:
/usr/share/kafka/bin/kafka-console-consumer.sh
--bootstrap-server localhost:9092
--topic 1-codicefiscale-21032022122736-i
--partition 0
--offset 5
我怎样才能对 kafka-python 库做同样的事情?
def demoListMessages(topicName):
consumer = KafkaConsumer(topicName,
bootstrap_servers="localhost:9092",
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
demoListMessages("1-codicefiscale-21032022122736-i")
我必须设置分区。这个片段对我有用。
def demoListPageMessages(topicName):
consumer = KafkaConsumer(bootstrap_servers="localhost:9092",auto_offset_reset='earliest',consumer_timeout_ms=1000)
tp = TopicPartition(topicName, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
consumer.seek(tp, 5)
for msg in consumer:
print(msg.value)