Kafka 的 Confluent Python API
Confluent Python API for Kafka
官方 Confluent Kafka 的基本用法出现错误 Python API:
我订阅了:
kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
使用回调:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))
for topic_partition in topic_partitions_with_offsets:
print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
产生控制台输出:
without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码可以完美地使用 Java API.
这是底层 C 库 librdkafka 中的错误。
见 upstream issue.
如果您想从存储的偏移量开始消费,您实际上不需要调用 position() 来检索它们,如果您不更改默认偏移量 -1001,客户端将自动执行此操作。
官方 Confluent Kafka 的基本用法出现错误 Python API:
我订阅了:
kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
使用回调:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))
for topic_partition in topic_partitions_with_offsets:
print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
产生控制台输出:
without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码可以完美地使用 Java API.
这是底层 C 库 librdkafka 中的错误。 见 upstream issue.
如果您想从存储的偏移量开始消费,您实际上不需要调用 position() 来检索它们,如果您不更改默认偏移量 -1001,客户端将自动执行此操作。